You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/06/20 09:55:04 UTC
[flink] branch release-1.14 updated: [FLINK-27420] Recreate metric groups for each new RM to avoid metric loss
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new ff060543bc8 [FLINK-27420] Recreate metric groups for each new RM to avoid metric loss
ff060543bc8 is described below
commit ff060543bc8867bdc97e4c3138c4999894f2909e
Author: Ben Augarten <ba...@gmail.com>
AuthorDate: Wed May 4 18:12:27 2022 -0700
[FLINK-27420] Recreate metric groups for each new RM to avoid metric loss
This closes #20019
---
.../resourcemanager/ResourceManagerFactory.java | 15 ++---
.../ResourceManagerProcessContext.java | 23 ++++----
.../ResourceManagerServiceImplTest.java | 66 ++++++++++++++++++++++
3 files changed, 83 insertions(+), 21 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
index fab804b7b55..b8901fe03f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
@@ -61,11 +61,6 @@ public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {
Executor ioExecutor)
throws ConfigurationException {
- final ResourceManagerMetricGroup resourceManagerMetricGroup =
- ResourceManagerMetricGroup.create(metricRegistry, hostname);
- final SlotManagerMetricGroup slotManagerMetricGroup =
- SlotManagerMetricGroup.create(metricRegistry, hostname);
-
final Configuration runtimeServicesAndRmConfig =
getEffectiveConfigurationForResourceManagerAndRuntimeServices(configuration);
@@ -84,8 +79,8 @@ public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {
fatalErrorHandler,
clusterInformation,
webInterfaceUrl,
- resourceManagerMetricGroup,
- slotManagerMetricGroup,
+ metricRegistry,
+ hostname,
ioExecutor);
}
@@ -98,7 +93,8 @@ public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {
context.getRmRuntimeServicesConfig(),
context.getRpcService(),
context.getHighAvailabilityServices(),
- context.getSlotManagerMetricGroup());
+ SlotManagerMetricGroup.create(
+ context.getMetricRegistry(), context.getHostname()));
return createResourceManager(
context.getRmConfig(),
@@ -109,7 +105,8 @@ public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {
context.getFatalErrorHandler(),
context.getClusterInformation(),
context.getWebInterfaceUrl(),
- context.getResourceManagerMetricGroup(),
+ ResourceManagerMetricGroup.create(
+ context.getMetricRegistry(), context.getHostname()),
resourceManagerRuntimeServices,
context.getIoExecutor());
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java
index 79d8ed13d90..ed72b75be2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java
@@ -22,8 +22,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
-import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
+import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
@@ -48,8 +47,8 @@ public class ResourceManagerProcessContext {
private final FatalErrorHandler fatalErrorHandler;
private final ClusterInformation clusterInformation;
@Nullable private final String webInterfaceUrl;
- private final ResourceManagerMetricGroup resourceManagerMetricGroup;
- private final SlotManagerMetricGroup slotManagerMetricGroup;
+ private final MetricRegistry metricRegistry;
+ private final String hostname;
private final Executor ioExecutor;
public ResourceManagerProcessContext(
@@ -61,8 +60,8 @@ public class ResourceManagerProcessContext {
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
- ResourceManagerMetricGroup resourceManagerMetricGroup,
- SlotManagerMetricGroup slotManagerMetricGroup,
+ MetricRegistry metricRegistry,
+ String hostname,
Executor ioExecutor) {
this.rmConfig = checkNotNull(rmConfig);
this.rmRuntimeServicesConfig = checkNotNull(rmRuntimeServicesConfig);
@@ -71,8 +70,8 @@ public class ResourceManagerProcessContext {
this.heartbeatServices = checkNotNull(heartbeatServices);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.clusterInformation = checkNotNull(clusterInformation);
- this.resourceManagerMetricGroup = checkNotNull(resourceManagerMetricGroup);
- this.slotManagerMetricGroup = checkNotNull(slotManagerMetricGroup);
+ this.metricRegistry = checkNotNull(metricRegistry);
+ this.hostname = checkNotNull(hostname);
this.ioExecutor = checkNotNull(ioExecutor);
this.webInterfaceUrl = webInterfaceUrl;
@@ -111,12 +110,12 @@ public class ResourceManagerProcessContext {
return webInterfaceUrl;
}
- public ResourceManagerMetricGroup getResourceManagerMetricGroup() {
- return resourceManagerMetricGroup;
+ public MetricRegistry getMetricRegistry() {
+ return metricRegistry;
}
- public SlotManagerMetricGroup getSlotManagerMetricGroup() {
- return slotManagerMetricGroup;
+ public String getHostname() {
+ return hostname;
}
public Executor getIoExecutor() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
index ec793d501b8..80c2fa6af8b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.rpc.RpcUtils;
@@ -35,12 +36,17 @@ import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeoutException;
@@ -490,6 +496,59 @@ public class ResourceManagerServiceImplTest extends TestLogger {
deregisterApplicationFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
}
+ @Test
+ public void grantAndRevokeLeadership_verifyMetrics() throws Exception {
+ final Set<String> registeredMetrics = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ TestingMetricRegistry metricRegistry =
+ TestingMetricRegistry.builder()
+ .setRegisterConsumer((a, b, c) -> registeredMetrics.add(b))
+ .setUnregisterConsumer((a, b, c) -> registeredMetrics.remove(b))
+ .build();
+
+ final TestingResourceManagerFactory rmFactory = rmFactoryBuilder.build();
+ resourceManagerService =
+ ResourceManagerServiceImpl.create(
+ rmFactory,
+ new Configuration(),
+ rpcService,
+ haService,
+ heartbeatServices,
+ fatalErrorHandler,
+ clusterInformation,
+ null,
+ metricRegistry,
+ "localhost",
+ ForkJoinPool.commonPool());
+ resourceManagerService.start();
+
+ Assert.assertEquals(0, registeredMetrics.size());
+ // grant leadership
+ leaderElectionService.isLeader(UUID.randomUUID());
+
+ assertRmStarted();
+ Set<String> expectedMetrics = new HashSet<>();
+ expectedMetrics.add(MetricNames.NUM_REGISTERED_TASK_MANAGERS);
+ expectedMetrics.add(MetricNames.TASK_SLOTS_TOTAL);
+ expectedMetrics.add(MetricNames.TASK_SLOTS_AVAILABLE);
+ Assert.assertTrue(
+ "Expected RM to register leader metrics",
+ registeredMetrics.containsAll(expectedMetrics));
+
+ // revoke leadership, block until old rm is terminated
+ revokeLeadership();
+
+ Set<String> intersection = new HashSet<>(registeredMetrics);
+ intersection.retainAll(expectedMetrics);
+ Assert.assertTrue("Expected RM to unregister leader metrics", intersection.isEmpty());
+
+ leaderElectionService.isLeader(UUID.randomUUID());
+
+ assertRmStarted();
+ Assert.assertTrue(
+ "Expected RM to re-register leader metrics",
+ registeredMetrics.containsAll(expectedMetrics));
+ }
+
private static void blockOnFuture(CompletableFuture<?> future) {
try {
future.get();
@@ -511,4 +570,11 @@ public class ResourceManagerServiceImplTest extends TestLogger {
private void assertRmStarted() throws Exception {
leaderElectionService.getConfirmationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit());
}
+
+ private void revokeLeadership() {
+ ResourceManager<?> leaderResourceManager =
+ resourceManagerService.getLeaderResourceManager();
+ leaderElectionService.notLeader();
+ blockOnFuture(leaderResourceManager.getTerminationFuture());
+ }
}