You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/05/25 20:26:51 UTC

[flink] branch master updated: [FLINK-22725][coordination] SlotManagers unregister metrics in suspend()

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 75a2aaa  [FLINK-22725][coordination] SlotManagers unregister metrics in suspend()
75a2aaa is described below

commit 75a2aaa4d091a54257e1620dd6d50055dff69e61
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue May 25 22:26:26 2021 +0200

    [FLINK-22725][coordination] SlotManagers unregister metrics in suspend()
---
 .../slotmanager/DeclarativeSlotManager.java        |  3 +-
 .../slotmanager/FineGrainedSlotManager.java        |  3 +-
 .../slotmanager/DeclarativeSlotManagerTest.java    | 36 +++++++++++++++++
 .../slotmanager/FineGrainedSlotManagerTest.java    | 46 ++++++++++++++++++++++
 .../FineGrainedSlotManagerTestBase.java            | 19 ++++++++-
 5 files changed, 103 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index 1cc4337..ab8e155 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -211,6 +211,8 @@ public class DeclarativeSlotManager implements SlotManager {
 
         LOG.info("Suspending the slot manager.");
 
+        slotManagerMetricGroup.close();
+
         resourceTracker.clear();
         if (taskExecutorManager != null) {
             taskExecutorManager.close();
@@ -238,7 +240,6 @@ public class DeclarativeSlotManager implements SlotManager {
         LOG.info("Closing the slot manager.");
 
         suspend();
-        slotManagerMetricGroup.close();
     }
 
     // ---------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index b12f811..5278520 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -218,6 +218,8 @@ public class FineGrainedSlotManager implements SlotManager {
 
         LOG.info("Suspending the slot manager.");
 
+        slotManagerMetricGroup.close();
+
         // stop the timeout checks for the TaskManagers
         if (taskManagerTimeoutsCheck != null) {
             taskManagerTimeoutsCheck.cancel(false);
@@ -249,7 +251,6 @@ public class FineGrainedSlotManager implements SlotManager {
         LOG.info("Closing the slot manager.");
 
         suspend();
-        slotManagerMetricGroup.close();
     }
 
     // ---------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 88a130a..1f39968 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -32,6 +32,9 @@ import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
@@ -49,6 +52,7 @@ import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.FunctionUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
 
@@ -77,6 +81,7 @@ import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
@@ -1450,6 +1455,37 @@ public class DeclarativeSlotManagerTest extends TestLogger {
         }
     }
 
+    @Test
+    public void testMetricsUnregisteredWhenSuspending() throws Exception {
+        testAccessMetricValueDuringItsUnregister(SlotManager::suspend);
+    }
+
+    @Test
+    public void testMetricsUnregisteredWhenClosing() throws Exception {
+        testAccessMetricValueDuringItsUnregister(AutoCloseable::close);
+    }
+
+    private void testAccessMetricValueDuringItsUnregister(
+            ThrowingConsumer<SlotManager, Exception> closeFn) throws Exception {
+        final AtomicInteger registeredMetrics = new AtomicInteger();
+        final MetricRegistry metricRegistry =
+                TestingMetricRegistry.builder()
+                        .setRegisterConsumer((a, b, c) -> registeredMetrics.incrementAndGet())
+                        .setUnregisterConsumer((a, b, c) -> registeredMetrics.decrementAndGet())
+                        .build();
+
+        final DeclarativeSlotManager slotManager =
+                createDeclarativeSlotManagerBuilder()
+                        .setSlotManagerMetricGroup(
+                                SlotManagerMetricGroup.create(metricRegistry, "localhost"))
+                        .buildAndStartWithDirectExec();
+
+        // sanity check to ensure metrics were actually registered
+        assertThat(registeredMetrics.get(), greaterThan(0));
+        closeFn.accept(slotManager);
+        assertThat(registeredMetrics.get(), is(0));
+    }
+
     private static SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots) {
         final Set<SlotStatus> slotStatusSet = new HashSet<>(numberSlots);
         for (int i = 0; i < numberSlots; i++) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index 82d4cf4..3357e35 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -27,6 +27,9 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.slots.ResourceRequirement;
@@ -35,6 +38,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.junit.Test;
 
@@ -44,10 +48,12 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
@@ -55,6 +61,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 
 /** Tests of {@link FineGrainedSlotManager}. */
@@ -948,4 +955,43 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
             }
         };
     }
+
+    @Test
+    public void testMetricsUnregisteredWhenSuspending() throws Exception {
+        testAccessMetricValueDuringItsUnregister(SlotManager::suspend);
+    }
+
+    @Test
+    public void testMetricsUnregisteredWhenClosing() throws Exception {
+        testAccessMetricValueDuringItsUnregister(AutoCloseable::close);
+    }
+
+    private void testAccessMetricValueDuringItsUnregister(
+            ThrowingConsumer<SlotManager, Exception> closeFn) throws Exception {
+        final AtomicInteger registeredMetrics = new AtomicInteger();
+        final MetricRegistry metricRegistry =
+                TestingMetricRegistry.builder()
+                        .setRegisterConsumer((a, b, c) -> registeredMetrics.incrementAndGet())
+                        .setUnregisterConsumer((a, b, c) -> registeredMetrics.decrementAndGet())
+                        .build();
+
+        final Context context = new Context();
+        context.setSlotManagerMetricGroup(
+                SlotManagerMetricGroup.create(metricRegistry, "localhost"));
+
+        context.runTest(
+                () -> {
+                    // sanity check to ensure metrics were actually registered
+                    assertThat(registeredMetrics.get(), greaterThan(0));
+                    context.runInMainThreadAndWait(
+                            () -> {
+                                try {
+                                    closeFn.accept(context.getSlotManager());
+                                } catch (Exception e) {
+                                    fail("Error when closing slot manager.");
+                                }
+                            });
+                    assertThat(registeredMetrics.get(), is(0));
+                });
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index 69fe129..a05098e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -136,7 +137,7 @@ public abstract class FineGrainedSlotManagerTestBase extends TestLogger {
         private final TaskManagerTracker taskManagerTracker = new FineGrainedTaskManagerTracker();
         private final SlotStatusSyncer slotStatusSyncer =
                 new DefaultSlotStatusSyncer(Time.seconds(10L));
-        private final SlotManagerMetricGroup slotManagerMetricGroup =
+        private SlotManagerMetricGroup slotManagerMetricGroup =
                 UnregisteredMetricGroups.createUnregisteredSlotManagerMetricGroup();
         private final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor();
         private final Executor mainThreadExecutor = MAIN_THREAD_EXECUTOR;
@@ -171,10 +172,24 @@ public abstract class FineGrainedSlotManagerTestBase extends TestLogger {
             this.requirementCheckDelay = requirementCheckDelay;
         }
 
+        public void setSlotManagerMetricGroup(SlotManagerMetricGroup slotManagerMetricGroup) {
+            this.slotManagerMetricGroup = slotManagerMetricGroup;
+        }
+
         void runInMainThread(Runnable runnable) {
             mainThreadExecutor.execute(runnable);
         }
 
+        void runInMainThreadAndWait(Runnable runnable) throws InterruptedException {
+            final OneShotLatch latch = new OneShotLatch();
+            mainThreadExecutor.execute(
+                    () -> {
+                        runnable.run();
+                        latch.trigger();
+                    });
+            latch.await();
+        }
+
         protected final void runTest(RunnableWithException testMethod) throws Exception {
             slotManager =
                     new FineGrainedSlotManager(
@@ -187,7 +202,7 @@ public abstract class FineGrainedSlotManagerTestBase extends TestLogger {
                             getResourceAllocationStrategy()
                                     .orElse(resourceAllocationStrategyBuilder.build()),
                             Time.milliseconds(requirementCheckDelay));
-            runInMainThread(
+            runInMainThreadAndWait(
                     () ->
                             slotManager.start(
                                     resourceManagerId,