You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/11/14 18:14:08 UTC

[flink] branch master updated: [FLINK-24235][metrics] Only support reporters via factories

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c9a394c608 [FLINK-24235][metrics] Only support reporters via factories
3c9a394c608 is described below

commit 3c9a394c608125827d224180c44128718972a82c
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Nov 14 19:13:59 2022 +0100

    [FLINK-24235][metrics] Only support reporters via factories
---
 .../metrics/reporter/InstantiateViaFactory.java    |   9 +-
 .../InterceptInstantiationViaReflection.java       |   7 +-
 .../flink/runtime/metrics/ReporterSetup.java       |  57 +------
 .../flink/runtime/metrics/ReporterSetupTest.java   | 178 ---------------------
 .../runtime/metrics/JobManagerMetricsITCase.java   |  46 ++++--
 .../metrics/SystemResourcesMetricsITCase.java      |  42 +++--
 6 files changed, 68 insertions(+), 271 deletions(-)

diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java
index 36c1d65a98a..1b59665c387 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java
@@ -26,14 +26,7 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
 /**
- * Annotation for {@link MetricReporter MetricReporters} that support factories but want to maintain
- * backwards-compatibility with existing reflection-based configurations.
- *
- * <p>When an annotated reporter is configured to be used via reflection the given factory will be
- * used instead.
- *
- * <p>Attention: This annotation does not work if the reporter is loaded as a plugin. For these
- * cases, annotate the factory with {@link InterceptInstantiationViaReflection} instead.
+ * This annotation has no effect and is only kept for compatibility reasons.
  *
  * @deprecated Will be removed in a future version. Users should use all reporters as plugins.
  */
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java
index 262f3fa0dbd..e867451bf8b 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java
@@ -25,13 +25,8 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
 /**
- * Annotation for {@link MetricReporterFactory MetricReporterFactories} that want to maintain
- * backwards-compatibility with existing reflection-based configurations.
+ * This annotation has no effect and is only kept for compatibility reasons.
  *
- * <p>When a reporter is configured to be used via reflection the annotated factory will be used
- * instead.
- *
- * @see InstantiateViaFactory
  * @deprecated Will be removed in a future version. Users should use all reporters as plugins.
  */
 @Retention(RetentionPolicy.RUNTIME)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
index c67f520dbb5..122dc229ba6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
@@ -25,8 +25,6 @@ import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.reporter.InstantiateViaFactory;
-import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
 import org.apache.flink.runtime.metrics.filter.DefaultMetricFilter;
@@ -365,8 +363,7 @@ public final class ReporterSetup {
     private static Optional<MetricReporter> loadReporter(
             final String reporterName,
             final Configuration reporterConfig,
-            final Map<String, MetricReporterFactory> reporterFactories)
-            throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+            final Map<String, MetricReporterFactory> reporterFactories) {
 
         final String reporterClassName = reporterConfig.get(MetricOptions.REPORTER_CLASS);
         final String factoryClassName = reporterConfig.get(MetricOptions.REPORTER_FACTORY_CLASS);
@@ -378,40 +375,16 @@ public final class ReporterSetup {
 
         if (reporterClassName != null) {
             LOG.warn(
-                    "The reporter configuration of '{}' configures the reporter class, which is a deprecated approach to configure reporters."
-                            + " Please configure a factory class instead: '{}{}.{}: <factoryClass>' to ensure that the configuration"
-                            + " continues to work with future versions.",
+                    "The reporter configuration of '{}' configures the reporter class, which is no a no longer supported approach to configure reporters."
+                            + " Please configure a factory class instead: '{}{}.{}: <factoryClass>'.",
                     reporterName,
                     ConfigConstants.METRICS_REPORTER_PREFIX,
                     reporterName,
                     MetricOptions.REPORTER_FACTORY_CLASS.key());
-
-            final Optional<MetricReporterFactory> interceptingFactory =
-                    reporterFactories.values().stream()
-                            .filter(
-                                    factory -> {
-                                        InterceptInstantiationViaReflection annotation =
-                                                factory.getClass()
-                                                        .getAnnotation(
-                                                                InterceptInstantiationViaReflection
-                                                                        .class);
-                                        return annotation != null
-                                                && annotation
-                                                        .reporterClassName()
-                                                        .equals(reporterClassName);
-                                    })
-                            .findAny();
-
-            if (interceptingFactory.isPresent()) {
-                return loadViaFactory(reporterConfig, interceptingFactory.get());
-            }
-
-            return loadViaReflection(
-                    reporterClassName, reporterName, reporterConfig, reporterFactories);
         }
 
         LOG.warn(
-                "No reporter class nor factory set for reporter {}. Metrics might not be exposed/reported.",
+                "No reporter factory set for reporter {}. Metrics might not be exposed/reported.",
                 reporterName);
         return Optional.empty();
     }
@@ -444,26 +417,4 @@ public final class ReporterSetup {
 
         return Optional.of(factory.createMetricReporter(metricConfig));
     }
-
-    @SuppressWarnings("deprecation")
-    private static Optional<MetricReporter> loadViaReflection(
-            final String reporterClassName,
-            final String reporterName,
-            final Configuration reporterConfig,
-            final Map<String, MetricReporterFactory> reporterFactories)
-            throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-
-        final Class<?> reporterClass = Class.forName(reporterClassName);
-
-        final InstantiateViaFactory alternativeFactoryAnnotation =
-                reporterClass.getAnnotation(InstantiateViaFactory.class);
-        if (alternativeFactoryAnnotation != null) {
-            final String alternativeFactoryClassName =
-                    alternativeFactoryAnnotation.factoryClassName();
-            return loadViaFactory(
-                    alternativeFactoryClassName, reporterName, reporterConfig, reporterFactories);
-        }
-
-        return Optional.of((MetricReporter) reporterClass.newInstance());
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
index 8f341dc3dbb..ddd9aefd542 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
@@ -21,10 +21,7 @@ package org.apache.flink.runtime.metrics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
-import org.apache.flink.core.plugin.TestingPluginManager;
 import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.reporter.InstantiateViaFactory;
-import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
@@ -37,7 +34,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
@@ -47,7 +43,6 @@ import static org.hamcrest.collection.IsMapContaining.hasEntry;
 import static org.hamcrest.core.IsCollectionContaining.hasItems;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 /** Tests for the {@link ReporterSetup}. */
 @ExtendWith(TestLoggerExtension.class)
@@ -65,7 +60,6 @@ class ReporterSetupTest {
                             TestReporter13.class.getName(),
                             TestReporterFactory.class.getName(),
                             FailingFactory.class.getName(),
-                            InstantiationTypeTrackingTestReporterFactory.class.getName(),
                             ConfigExposingReporterFactory.class.getName())
                     .build();
 
@@ -305,36 +299,6 @@ class ReporterSetupTest {
         assertEquals(TestReporterFactory.REPORTER, reporterSetup.getReporter());
     }
 
-    /**
-     * Verifies that the factory approach is prioritized if both the factory and reflection approach
-     * are configured.
-     */
-    @Test
-    @SuppressWarnings("deprecation")
-    public void testFactoryPrioritization() throws Exception {
-        final Configuration config = new Configuration();
-        config.setString(
-                ConfigConstants.METRICS_REPORTER_PREFIX
-                        + "test."
-                        + MetricOptions.REPORTER_FACTORY_CLASS.key(),
-                InstantiationTypeTrackingTestReporterFactory.class.getName());
-        config.setString(
-                ConfigConstants.METRICS_REPORTER_PREFIX
-                        + "test."
-                        + MetricOptions.REPORTER_CLASS.key(),
-                InstantiationTypeTrackingTestReporter.class.getName());
-
-        final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null);
-
-        assertEquals(1, reporterSetups.size());
-
-        final ReporterSetup reporterSetup = reporterSetups.get(0);
-        final InstantiationTypeTrackingTestReporter metricReporter =
-                (InstantiationTypeTrackingTestReporter) reporterSetup.getReporter();
-
-        assertTrue(metricReporter.createdByFactory);
-    }
-
     /** Verifies that an error thrown by a factory does not affect the setup of other reporters. */
     @Test
     void testFactoryFailureIsolation() throws Exception {
@@ -355,37 +319,6 @@ class ReporterSetupTest {
         assertEquals(1, reporterSetups.size());
     }
 
-    /** Verifies that factory/reflection approaches can be mixed freely. */
-    @Test
-    @SuppressWarnings("deprecation")
-    public void testMixedSetupsFactoryParsing() throws Exception {
-        final Configuration config = new Configuration();
-        config.setString(
-                ConfigConstants.METRICS_REPORTER_PREFIX
-                        + "test1."
-                        + MetricOptions.REPORTER_FACTORY_CLASS.key(),
-                InstantiationTypeTrackingTestReporterFactory.class.getName());
-        config.setString(
-                ConfigConstants.METRICS_REPORTER_PREFIX
-                        + "test2."
-                        + MetricOptions.REPORTER_CLASS.key(),
-                InstantiationTypeTrackingTestReporter.class.getName());
-
-        final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null);
-
-        assertEquals(2, reporterSetups.size());
-
-        final ReporterSetup reporterSetup1 = reporterSetups.get(0);
-        final ReporterSetup reporterSetup2 = reporterSetups.get(1);
-
-        final InstantiationTypeTrackingTestReporter metricReporter1 =
-                (InstantiationTypeTrackingTestReporter) reporterSetup1.getReporter();
-        final InstantiationTypeTrackingTestReporter metricReporter2 =
-                (InstantiationTypeTrackingTestReporter) reporterSetup2.getReporter();
-
-        assertTrue(metricReporter1.createdByFactory ^ metricReporter2.createdByFactory);
-    }
-
     @Test
     void testFactoryArgumentForwarding() throws Exception {
         final Configuration config = new Configuration();
@@ -402,64 +335,6 @@ class ReporterSetupTest {
         assertEquals("hello", passedConfig.getProperty("arg"));
     }
 
-    /**
-     * Verifies that the factory approach is used if the factory is annotated with {@link
-     * InstantiateViaFactory}.
-     */
-    @Test
-    @SuppressWarnings("deprecation")
-    public void testFactoryAnnotation() {
-        final Configuration config = new Configuration();
-        config.setString(
-                ConfigConstants.METRICS_REPORTER_PREFIX
-                        + "test."
-                        + MetricOptions.REPORTER_CLASS.key(),
-                InstantiationTypeTrackingTestReporter2.class.getName());
-
-        final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null);
-
-        assertEquals(1, reporterSetups.size());
-
-        final ReporterSetup reporterSetup = reporterSetups.get(0);
-        final InstantiationTypeTrackingTestReporter metricReporter =
-                (InstantiationTypeTrackingTestReporter) reporterSetup.getReporter();
-
-        assertTrue(metricReporter.createdByFactory);
-    }
-
-    /**
-     * Verifies that the factory approach is used if the factory is annotated with {@link
-     * org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection}.
-     */
-    @Test
-    @SuppressWarnings("deprecation")
-    public void testReflectionInterception() {
-        final Configuration config = new Configuration();
-        config.setString(
-                ConfigConstants.METRICS_REPORTER_PREFIX
-                        + "test."
-                        + MetricOptions.REPORTER_CLASS.key(),
-                InstantiationTypeTrackingTestReporter.class.getName());
-
-        final List<ReporterSetup> reporterSetups =
-                ReporterSetup.fromConfiguration(
-                        config,
-                        new TestingPluginManager(
-                                Collections.singletonMap(
-                                        MetricReporterFactory.class,
-                                        Collections.singletonList(
-                                                        new InterceptingInstantiationTypeTrackingTestReporterFactory())
-                                                .iterator())));
-
-        assertEquals(1, reporterSetups.size());
-
-        final ReporterSetup reporterSetup = reporterSetups.get(0);
-        final InstantiationTypeTrackingTestReporter metricReporter =
-                (InstantiationTypeTrackingTestReporter) reporterSetup.getReporter();
-
-        assertTrue(metricReporter.createdByFactory);
-    }
-
     @Test
     void testAdditionalVariablesParsing() {
         final String tag1 = "foo";
@@ -524,57 +399,4 @@ class ReporterSetupTest {
             throw new RuntimeException();
         }
     }
-
-    /** Factory for {@link InstantiationTypeTrackingTestReporter}. */
-    public static class InstantiationTypeTrackingTestReporterFactory
-            implements MetricReporterFactory {
-
-        @Override
-        public MetricReporter createMetricReporter(Properties config) {
-            return new InstantiationTypeTrackingTestReporter(true);
-        }
-    }
-
-    /**
-     * Factory for {@link InstantiationTypeTrackingTestReporter} that intercepts reflection-based
-     * instantiation attempts.
-     */
-    @InterceptInstantiationViaReflection(
-            reporterClassName =
-                    "org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporter")
-    @SuppressWarnings("deprecation")
-    public static class InterceptingInstantiationTypeTrackingTestReporterFactory
-            implements MetricReporterFactory {
-
-        @Override
-        public MetricReporter createMetricReporter(Properties config) {
-            return new InstantiationTypeTrackingTestReporter(true);
-        }
-    }
-
-    /** Reporter that exposes which constructor was called. */
-    protected static class InstantiationTypeTrackingTestReporter extends TestReporter {
-
-        private final boolean createdByFactory;
-
-        public InstantiationTypeTrackingTestReporter() {
-            this(false);
-        }
-
-        InstantiationTypeTrackingTestReporter(boolean createdByFactory) {
-            this.createdByFactory = createdByFactory;
-        }
-
-        public boolean isCreatedByFactory() {
-            return createdByFactory;
-        }
-    }
-
-    /** Annotated reporter that exposes which constructor was called. */
-    @InstantiateViaFactory(
-            factoryClassName =
-                    "org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporterFactory")
-    @SuppressWarnings("deprecation")
-    protected static class InstantiationTypeTrackingTestReporter2
-            extends InstantiationTypeTrackingTestReporter {}
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
index 7d8847d13fa..07e7e153440 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
@@ -23,21 +23,25 @@ import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.junit.extensions.ContextClassLoaderExtension;
 
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -45,7 +49,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 /** Integration tests for proper initialization of the job manager metrics. */
-public class JobManagerMetricsITCase extends TestLogger {
+class JobManagerMetricsITCase {
 
     private static final String JOB_MANAGER_METRICS_PREFIX = "localhost.jobmanager.";
 
@@ -53,17 +57,25 @@ public class JobManagerMetricsITCase extends TestLogger {
 
     private CheckedThread jobExecuteThread;
 
-    @ClassRule
-    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
-            new MiniClusterWithClientResource(
+    @RegisterExtension
+    @Order(1)
+    static final ContextClassLoaderExtension CONTEXT_CLASS_LOADER_EXTENSION =
+            ContextClassLoaderExtension.builder()
+                    .withServiceEntry(MetricReporterFactory.class, TestReporter.class.getName())
+                    .build();
+
+    @RegisterExtension
+    @Order(2)
+    static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setConfiguration(getConfiguration())
                             .setNumberTaskManagers(1)
                             .setNumberSlotsPerTaskManager(1)
                             .build());
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    void setUp() throws Exception {
         jobExecuteThread =
                 new CheckedThread() {
 
@@ -96,7 +108,7 @@ public class JobManagerMetricsITCase extends TestLogger {
     }
 
     @Test
-    public void testJobManagerMetrics() throws Exception {
+    void testJobManagerMetrics() throws Exception {
         assertEquals(1, TestReporter.OPENED_REPORTERS.size());
         TestReporter reporter = TestReporter.OPENED_REPORTERS.iterator().next();
 
@@ -138,7 +150,7 @@ public class JobManagerMetricsITCase extends TestLogger {
     private static Configuration getConfiguration() {
         Configuration configuration = new Configuration();
         configuration.setString(
-                "metrics.reporter.test_reporter.class", TestReporter.class.getName());
+                "metrics.reporter.test_reporter.factory.class", TestReporter.class.getName());
         return configuration;
     }
 
@@ -160,7 +172,8 @@ public class JobManagerMetricsITCase extends TestLogger {
     }
 
     /** Test metric reporter that exposes registered metrics. */
-    public static final class TestReporter extends AbstractReporter {
+    public static final class TestReporter extends AbstractReporter
+            implements MetricReporterFactory {
         public static final Set<TestReporter> OPENED_REPORTERS = ConcurrentHashMap.newKeySet();
 
         @Override
@@ -181,5 +194,10 @@ public class JobManagerMetricsITCase extends TestLogger {
         public Map<Gauge<?>, String> getGauges() {
             return gauges;
         }
+
+        @Override
+        public MetricReporter createMetricReporter(Properties properties) {
+            return this;
+        }
     }
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
index 0082460083a..b077cf498d7 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
@@ -25,17 +25,20 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.junit.extensions.ContextClassLoaderExtension;
 import org.apache.flink.util.concurrent.FutureUtils;
 
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -47,11 +50,21 @@ import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRI
 import static org.junit.Assert.assertEquals;
 
 /** Integration tests for proper initialization of the system resource metrics. */
-public class SystemResourcesMetricsITCase extends TestLogger {
-
-    @ClassRule
-    public static final MiniClusterResource MINI_CLUSTER_RESOURCE =
-            new MiniClusterResource(
+class SystemResourcesMetricsITCase {
+
+    @RegisterExtension
+    @Order(1)
+    static final ContextClassLoaderExtension CONTEXT_CLASS_LOADER_EXTENSION =
+            ContextClassLoaderExtension.builder()
+                    .withServiceEntry(
+                            MetricReporterFactory.class,
+                            SystemResourcesMetricsITCase.TestReporter.class.getName())
+                    .build();
+
+    @RegisterExtension
+    @Order(2)
+    static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setConfiguration(getConfiguration())
                             .setNumberTaskManagers(1)
@@ -65,12 +78,12 @@ public class SystemResourcesMetricsITCase extends TestLogger {
         configuration.setString(MetricOptions.SCOPE_NAMING_JM, "jobmanager");
         configuration.setString(MetricOptions.SCOPE_NAMING_TM, "taskmanager");
         configuration.setString(
-                "metrics.reporter.test_reporter.class", TestReporter.class.getName());
+                "metrics.reporter.test_reporter.factory.class", TestReporter.class.getName());
         return configuration;
     }
 
     @Test
-    public void startTaskManagerAndCheckForRegisteredSystemMetrics() throws Exception {
+    void startTaskManagerAndCheckForRegisteredSystemMetrics() throws Exception {
         assertEquals(1, TestReporter.OPENED_REPORTERS.size());
         TestReporter reporter = TestReporter.OPENED_REPORTERS.iterator().next();
 
@@ -107,7 +120,7 @@ public class SystemResourcesMetricsITCase extends TestLogger {
     }
 
     /** Test metric reporter that exposes registered metrics. */
-    public static final class TestReporter implements MetricReporter {
+    public static final class TestReporter implements MetricReporter, MetricReporterFactory {
         public static final Set<TestReporter> OPENED_REPORTERS = ConcurrentHashMap.newKeySet();
         private final Map<String, CompletableFuture<Void>> patternFutures =
                 getExpectedPatterns().stream()
@@ -140,5 +153,10 @@ public class SystemResourcesMetricsITCase extends TestLogger {
 
         @Override
         public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {}
+
+        @Override
+        public MetricReporter createMetricReporter(Properties properties) {
+            return this;
+        }
     }
 }