You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/11/14 18:14:08 UTC
[flink] branch master updated: [FLINK-24235][metrics] Only support reporters via factories
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3c9a394c608 [FLINK-24235][metrics] Only support reporters via factories
3c9a394c608 is described below
commit 3c9a394c608125827d224180c44128718972a82c
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Nov 14 19:13:59 2022 +0100
[FLINK-24235][metrics] Only support reporters via factories
---
.../metrics/reporter/InstantiateViaFactory.java | 9 +-
.../InterceptInstantiationViaReflection.java | 7 +-
.../flink/runtime/metrics/ReporterSetup.java | 57 +------
.../flink/runtime/metrics/ReporterSetupTest.java | 178 ---------------------
.../runtime/metrics/JobManagerMetricsITCase.java | 46 ++++--
.../metrics/SystemResourcesMetricsITCase.java | 42 +++--
6 files changed, 68 insertions(+), 271 deletions(-)
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java
index 36c1d65a98a..1b59665c387 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java
@@ -26,14 +26,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
- * Annotation for {@link MetricReporter MetricReporters} that support factories but want to maintain
- * backwards-compatibility with existing reflection-based configurations.
- *
- * <p>When an annotated reporter is configured to be used via reflection the given factory will be
- * used instead.
- *
- * <p>Attention: This annotation does not work if the reporter is loaded as a plugin. For these
- * cases, annotate the factory with {@link InterceptInstantiationViaReflection} instead.
+ * This annotation has no effect and is only kept for compatibility reasons.
*
* @deprecated Will be removed in a future version. Users should use all reporters as plugins.
*/
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java
index 262f3fa0dbd..e867451bf8b 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java
@@ -25,13 +25,8 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
- * Annotation for {@link MetricReporterFactory MetricReporterFactories} that want to maintain
- * backwards-compatibility with existing reflection-based configurations.
+ * This annotation has no effect and is only kept for compatibility reasons.
*
- * <p>When a reporter is configured to be used via reflection the annotated factory will be used
- * instead.
- *
- * @see InstantiateViaFactory
* @deprecated Will be removed in a future version. Users should use all reporters as plugins.
*/
@Retention(RetentionPolicy.RUNTIME)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
index c67f520dbb5..122dc229ba6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
@@ -25,8 +25,6 @@ import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.reporter.InstantiateViaFactory;
-import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.MetricReporterFactory;
import org.apache.flink.runtime.metrics.filter.DefaultMetricFilter;
@@ -365,8 +363,7 @@ public final class ReporterSetup {
private static Optional<MetricReporter> loadReporter(
final String reporterName,
final Configuration reporterConfig,
- final Map<String, MetricReporterFactory> reporterFactories)
- throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ final Map<String, MetricReporterFactory> reporterFactories) {
final String reporterClassName = reporterConfig.get(MetricOptions.REPORTER_CLASS);
final String factoryClassName = reporterConfig.get(MetricOptions.REPORTER_FACTORY_CLASS);
@@ -378,40 +375,16 @@ public final class ReporterSetup {
if (reporterClassName != null) {
LOG.warn(
- "The reporter configuration of '{}' configures the reporter class, which is a deprecated approach to configure reporters."
- + " Please configure a factory class instead: '{}{}.{}: <factoryClass>' to ensure that the configuration"
- + " continues to work with future versions.",
+ "The reporter configuration of '{}' configures the reporter class, which is no a no longer supported approach to configure reporters."
+ + " Please configure a factory class instead: '{}{}.{}: <factoryClass>'.",
reporterName,
ConfigConstants.METRICS_REPORTER_PREFIX,
reporterName,
MetricOptions.REPORTER_FACTORY_CLASS.key());
-
- final Optional<MetricReporterFactory> interceptingFactory =
- reporterFactories.values().stream()
- .filter(
- factory -> {
- InterceptInstantiationViaReflection annotation =
- factory.getClass()
- .getAnnotation(
- InterceptInstantiationViaReflection
- .class);
- return annotation != null
- && annotation
- .reporterClassName()
- .equals(reporterClassName);
- })
- .findAny();
-
- if (interceptingFactory.isPresent()) {
- return loadViaFactory(reporterConfig, interceptingFactory.get());
- }
-
- return loadViaReflection(
- reporterClassName, reporterName, reporterConfig, reporterFactories);
}
LOG.warn(
- "No reporter class nor factory set for reporter {}. Metrics might not be exposed/reported.",
+ "No reporter factory set for reporter {}. Metrics might not be exposed/reported.",
reporterName);
return Optional.empty();
}
@@ -444,26 +417,4 @@ public final class ReporterSetup {
return Optional.of(factory.createMetricReporter(metricConfig));
}
-
- @SuppressWarnings("deprecation")
- private static Optional<MetricReporter> loadViaReflection(
- final String reporterClassName,
- final String reporterName,
- final Configuration reporterConfig,
- final Map<String, MetricReporterFactory> reporterFactories)
- throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-
- final Class<?> reporterClass = Class.forName(reporterClassName);
-
- final InstantiateViaFactory alternativeFactoryAnnotation =
- reporterClass.getAnnotation(InstantiateViaFactory.class);
- if (alternativeFactoryAnnotation != null) {
- final String alternativeFactoryClassName =
- alternativeFactoryAnnotation.factoryClassName();
- return loadViaFactory(
- alternativeFactoryClassName, reporterName, reporterConfig, reporterFactories);
- }
-
- return Optional.of((MetricReporter) reporterClass.newInstance());
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
index 8f341dc3dbb..ddd9aefd542 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
@@ -21,10 +21,7 @@ package org.apache.flink.runtime.metrics;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
-import org.apache.flink.core.plugin.TestingPluginManager;
import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.reporter.InstantiateViaFactory;
-import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.MetricReporterFactory;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
@@ -37,7 +34,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
@@ -47,7 +43,6 @@ import static org.hamcrest.collection.IsMapContaining.hasEntry;
import static org.hamcrest.core.IsCollectionContaining.hasItems;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
/** Tests for the {@link ReporterSetup}. */
@ExtendWith(TestLoggerExtension.class)
@@ -65,7 +60,6 @@ class ReporterSetupTest {
TestReporter13.class.getName(),
TestReporterFactory.class.getName(),
FailingFactory.class.getName(),
- InstantiationTypeTrackingTestReporterFactory.class.getName(),
ConfigExposingReporterFactory.class.getName())
.build();
@@ -305,36 +299,6 @@ class ReporterSetupTest {
assertEquals(TestReporterFactory.REPORTER, reporterSetup.getReporter());
}
- /**
- * Verifies that the factory approach is prioritized if both the factory and reflection approach
- * are configured.
- */
- @Test
- @SuppressWarnings("deprecation")
- public void testFactoryPrioritization() throws Exception {
- final Configuration config = new Configuration();
- config.setString(
- ConfigConstants.METRICS_REPORTER_PREFIX
- + "test."
- + MetricOptions.REPORTER_FACTORY_CLASS.key(),
- InstantiationTypeTrackingTestReporterFactory.class.getName());
- config.setString(
- ConfigConstants.METRICS_REPORTER_PREFIX
- + "test."
- + MetricOptions.REPORTER_CLASS.key(),
- InstantiationTypeTrackingTestReporter.class.getName());
-
- final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null);
-
- assertEquals(1, reporterSetups.size());
-
- final ReporterSetup reporterSetup = reporterSetups.get(0);
- final InstantiationTypeTrackingTestReporter metricReporter =
- (InstantiationTypeTrackingTestReporter) reporterSetup.getReporter();
-
- assertTrue(metricReporter.createdByFactory);
- }
-
/** Verifies that an error thrown by a factory does not affect the setup of other reporters. */
@Test
void testFactoryFailureIsolation() throws Exception {
@@ -355,37 +319,6 @@ class ReporterSetupTest {
assertEquals(1, reporterSetups.size());
}
- /** Verifies that factory/reflection approaches can be mixed freely. */
- @Test
- @SuppressWarnings("deprecation")
- public void testMixedSetupsFactoryParsing() throws Exception {
- final Configuration config = new Configuration();
- config.setString(
- ConfigConstants.METRICS_REPORTER_PREFIX
- + "test1."
- + MetricOptions.REPORTER_FACTORY_CLASS.key(),
- InstantiationTypeTrackingTestReporterFactory.class.getName());
- config.setString(
- ConfigConstants.METRICS_REPORTER_PREFIX
- + "test2."
- + MetricOptions.REPORTER_CLASS.key(),
- InstantiationTypeTrackingTestReporter.class.getName());
-
- final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null);
-
- assertEquals(2, reporterSetups.size());
-
- final ReporterSetup reporterSetup1 = reporterSetups.get(0);
- final ReporterSetup reporterSetup2 = reporterSetups.get(1);
-
- final InstantiationTypeTrackingTestReporter metricReporter1 =
- (InstantiationTypeTrackingTestReporter) reporterSetup1.getReporter();
- final InstantiationTypeTrackingTestReporter metricReporter2 =
- (InstantiationTypeTrackingTestReporter) reporterSetup2.getReporter();
-
- assertTrue(metricReporter1.createdByFactory ^ metricReporter2.createdByFactory);
- }
-
@Test
void testFactoryArgumentForwarding() throws Exception {
final Configuration config = new Configuration();
@@ -402,64 +335,6 @@ class ReporterSetupTest {
assertEquals("hello", passedConfig.getProperty("arg"));
}
- /**
- * Verifies that the factory approach is used if the factory is annotated with {@link
- * InstantiateViaFactory}.
- */
- @Test
- @SuppressWarnings("deprecation")
- public void testFactoryAnnotation() {
- final Configuration config = new Configuration();
- config.setString(
- ConfigConstants.METRICS_REPORTER_PREFIX
- + "test."
- + MetricOptions.REPORTER_CLASS.key(),
- InstantiationTypeTrackingTestReporter2.class.getName());
-
- final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null);
-
- assertEquals(1, reporterSetups.size());
-
- final ReporterSetup reporterSetup = reporterSetups.get(0);
- final InstantiationTypeTrackingTestReporter metricReporter =
- (InstantiationTypeTrackingTestReporter) reporterSetup.getReporter();
-
- assertTrue(metricReporter.createdByFactory);
- }
-
- /**
- * Verifies that the factory approach is used if the factory is annotated with {@link
- * org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection}.
- */
- @Test
- @SuppressWarnings("deprecation")
- public void testReflectionInterception() {
- final Configuration config = new Configuration();
- config.setString(
- ConfigConstants.METRICS_REPORTER_PREFIX
- + "test."
- + MetricOptions.REPORTER_CLASS.key(),
- InstantiationTypeTrackingTestReporter.class.getName());
-
- final List<ReporterSetup> reporterSetups =
- ReporterSetup.fromConfiguration(
- config,
- new TestingPluginManager(
- Collections.singletonMap(
- MetricReporterFactory.class,
- Collections.singletonList(
- new InterceptingInstantiationTypeTrackingTestReporterFactory())
- .iterator())));
-
- assertEquals(1, reporterSetups.size());
-
- final ReporterSetup reporterSetup = reporterSetups.get(0);
- final InstantiationTypeTrackingTestReporter metricReporter =
- (InstantiationTypeTrackingTestReporter) reporterSetup.getReporter();
-
- assertTrue(metricReporter.createdByFactory);
- }
-
@Test
void testAdditionalVariablesParsing() {
final String tag1 = "foo";
@@ -524,57 +399,4 @@ class ReporterSetupTest {
throw new RuntimeException();
}
}
-
- /** Factory for {@link InstantiationTypeTrackingTestReporter}. */
- public static class InstantiationTypeTrackingTestReporterFactory
- implements MetricReporterFactory {
-
- @Override
- public MetricReporter createMetricReporter(Properties config) {
- return new InstantiationTypeTrackingTestReporter(true);
- }
- }
-
- /**
- * Factory for {@link InstantiationTypeTrackingTestReporter} that intercepts reflection-based
- * instantiation attempts.
- */
- @InterceptInstantiationViaReflection(
- reporterClassName =
- "org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporter")
- @SuppressWarnings("deprecation")
- public static class InterceptingInstantiationTypeTrackingTestReporterFactory
- implements MetricReporterFactory {
-
- @Override
- public MetricReporter createMetricReporter(Properties config) {
- return new InstantiationTypeTrackingTestReporter(true);
- }
- }
-
- /** Reporter that exposes which constructor was called. */
- protected static class InstantiationTypeTrackingTestReporter extends TestReporter {
-
- private final boolean createdByFactory;
-
- public InstantiationTypeTrackingTestReporter() {
- this(false);
- }
-
- InstantiationTypeTrackingTestReporter(boolean createdByFactory) {
- this.createdByFactory = createdByFactory;
- }
-
- public boolean isCreatedByFactory() {
- return createdByFactory;
- }
- }
-
- /** Annotated reporter that exposes which constructor was called. */
- @InstantiateViaFactory(
- factoryClassName =
- "org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporterFactory")
- @SuppressWarnings("deprecation")
- protected static class InstantiationTypeTrackingTestReporter2
- extends InstantiationTypeTrackingTestReporter {}
}
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
index 7d8847d13fa..07e7e153440 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
@@ -23,21 +23,25 @@ import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.junit.extensions.ContextClassLoaderExtension;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -45,7 +49,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
/** Integration tests for proper initialization of the job manager metrics. */
-public class JobManagerMetricsITCase extends TestLogger {
+class JobManagerMetricsITCase {
private static final String JOB_MANAGER_METRICS_PREFIX = "localhost.jobmanager.";
@@ -53,17 +57,25 @@ public class JobManagerMetricsITCase extends TestLogger {
private CheckedThread jobExecuteThread;
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- new MiniClusterWithClientResource(
+ @RegisterExtension
+ @Order(1)
+ static final ContextClassLoaderExtension CONTEXT_CLASS_LOADER_EXTENSION =
+ ContextClassLoaderExtension.builder()
+ .withServiceEntry(MetricReporterFactory.class, TestReporter.class.getName())
+ .build();
+
+ @RegisterExtension
+ @Order(2)
+ static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfiguration())
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
.build());
- @Before
- public void setUp() throws Exception {
+ @BeforeEach
+ void setUp() throws Exception {
jobExecuteThread =
new CheckedThread() {
@@ -96,7 +108,7 @@ public class JobManagerMetricsITCase extends TestLogger {
}
@Test
- public void testJobManagerMetrics() throws Exception {
+ void testJobManagerMetrics() throws Exception {
assertEquals(1, TestReporter.OPENED_REPORTERS.size());
TestReporter reporter = TestReporter.OPENED_REPORTERS.iterator().next();
@@ -138,7 +150,7 @@ public class JobManagerMetricsITCase extends TestLogger {
private static Configuration getConfiguration() {
Configuration configuration = new Configuration();
configuration.setString(
- "metrics.reporter.test_reporter.class", TestReporter.class.getName());
+ "metrics.reporter.test_reporter.factory.class", TestReporter.class.getName());
return configuration;
}
@@ -160,7 +172,8 @@ public class JobManagerMetricsITCase extends TestLogger {
}
/** Test metric reporter that exposes registered metrics. */
- public static final class TestReporter extends AbstractReporter {
+ public static final class TestReporter extends AbstractReporter
+ implements MetricReporterFactory {
public static final Set<TestReporter> OPENED_REPORTERS = ConcurrentHashMap.newKeySet();
@Override
@@ -181,5 +194,10 @@ public class JobManagerMetricsITCase extends TestLogger {
public Map<Gauge<?>, String> getGauges() {
return gauges;
}
+
+ @Override
+ public MetricReporter createMetricReporter(Properties properties) {
+ return this;
+ }
}
}
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
index 0082460083a..b077cf498d7 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
@@ -25,17 +25,20 @@ import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.junit.extensions.ContextClassLoaderExtension;
import org.apache.flink.util.concurrent.FutureUtils;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -47,11 +50,21 @@ import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRI
import static org.junit.Assert.assertEquals;
/** Integration tests for proper initialization of the system resource metrics. */
-public class SystemResourcesMetricsITCase extends TestLogger {
-
- @ClassRule
- public static final MiniClusterResource MINI_CLUSTER_RESOURCE =
- new MiniClusterResource(
+class SystemResourcesMetricsITCase {
+
+ @RegisterExtension
+ @Order(1)
+ static final ContextClassLoaderExtension CONTEXT_CLASS_LOADER_EXTENSION =
+ ContextClassLoaderExtension.builder()
+ .withServiceEntry(
+ MetricReporterFactory.class,
+ SystemResourcesMetricsITCase.TestReporter.class.getName())
+ .build();
+
+ @RegisterExtension
+ @Order(2)
+ static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfiguration())
.setNumberTaskManagers(1)
@@ -65,12 +78,12 @@ public class SystemResourcesMetricsITCase extends TestLogger {
configuration.setString(MetricOptions.SCOPE_NAMING_JM, "jobmanager");
configuration.setString(MetricOptions.SCOPE_NAMING_TM, "taskmanager");
configuration.setString(
- "metrics.reporter.test_reporter.class", TestReporter.class.getName());
+ "metrics.reporter.test_reporter.factory.class", TestReporter.class.getName());
return configuration;
}
@Test
- public void startTaskManagerAndCheckForRegisteredSystemMetrics() throws Exception {
+ void startTaskManagerAndCheckForRegisteredSystemMetrics() throws Exception {
assertEquals(1, TestReporter.OPENED_REPORTERS.size());
TestReporter reporter = TestReporter.OPENED_REPORTERS.iterator().next();
@@ -107,7 +120,7 @@ public class SystemResourcesMetricsITCase extends TestLogger {
}
/** Test metric reporter that exposes registered metrics. */
- public static final class TestReporter implements MetricReporter {
+ public static final class TestReporter implements MetricReporter, MetricReporterFactory {
public static final Set<TestReporter> OPENED_REPORTERS = ConcurrentHashMap.newKeySet();
private final Map<String, CompletableFuture<Void>> patternFutures =
getExpectedPatterns().stream()
@@ -140,5 +153,10 @@ public class SystemResourcesMetricsITCase extends TestLogger {
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {}
+
+ @Override
+ public MetricReporter createMetricReporter(Properties properties) {
+ return this;
+ }
}
}