You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/06/29 08:20:09 UTC

[flink] branch release-1.11 updated (4882cc6 -> 4daa859)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 4882cc6  [FLINK-18439][docs] Update sql client jar url in docs
     new 1732b71  [FLINK-18435][metrics] Add support for intercepting reflection-based instantiations
     new 4daa859  [FLINK-18435][metrics] Adjust reporter factories to intercept reflection-based instantiations

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../tests/PrometheusReporterEndToEndITCase.java    |  8 +++++
 .../metrics/reporter/InstantiateViaFactory.java    |  3 ++
 .../InterceptInstantiationViaReflection.java       | 25 ++++++++--------
 .../datadog/DatadogHttpReporterFactory.java        |  2 ++
 .../metrics/graphite/GraphiteReporterFactory.java  |  2 ++
 .../metrics/influxdb/InfluxdbReporterFactory.java  |  2 ++
 .../flink/metrics/jmx/JMXReporterFactory.java      |  2 ++
 .../PrometheusPushGatewayReporterFactory.java      |  2 ++
 .../prometheus/PrometheusReporterFactory.java      |  2 ++
 .../flink/metrics/slf4j/Slf4jReporterFactory.java  |  2 ++
 .../metrics/statsd/StatsDReporterFactory.java      |  2 ++
 .../flink/runtime/metrics/ReporterSetup.java       | 27 ++++++++++++++---
 .../flink/runtime/metrics/ReporterSetupTest.java   | 35 ++++++++++++++++++++++
 13 files changed, 98 insertions(+), 16 deletions(-)
 copy flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java => flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java (55%)


[flink] 02/02: [FLINK-18435][metrics] Adjust reporter factories to intercept reflection-based instantiations

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4daa859e2f3015cd81d11661dec3a11ed24e045d
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Jun 26 14:42:17 2020 +0200

    [FLINK-18435][metrics] Adjust reporter factories to intercept reflection-based instantiations
---
 .../prometheus/tests/PrometheusReporterEndToEndITCase.java        | 8 ++++++++
 .../apache/flink/metrics/datadog/DatadogHttpReporterFactory.java  | 2 ++
 .../apache/flink/metrics/graphite/GraphiteReporterFactory.java    | 2 ++
 .../apache/flink/metrics/influxdb/InfluxdbReporterFactory.java    | 2 ++
 .../java/org/apache/flink/metrics/jmx/JMXReporterFactory.java     | 2 ++
 .../metrics/prometheus/PrometheusPushGatewayReporterFactory.java  | 2 ++
 .../flink/metrics/prometheus/PrometheusReporterFactory.java       | 2 ++
 .../java/org/apache/flink/metrics/slf4j/Slf4jReporterFactory.java | 2 ++
 .../org/apache/flink/metrics/statsd/StatsDReporterFactory.java    | 2 ++
 9 files changed, 24 insertions(+)

diff --git a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
index 44ad63d..dbcad2a 100644
--- a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
+++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
@@ -135,11 +135,19 @@ public class PrometheusReporterEndToEndITCase extends TestLogger {
 				FACTORY),
 			TestParams.from("Jar in 'plugins'",
 				builder -> {},
+				REFLECTION),
+			TestParams.from("Jar in 'plugins'",
+				builder -> {},
 				FACTORY),
 			TestParams.from("Jar in 'lib' and 'plugins'",
 				builder -> {
 					builder.copyJar(PROMETHEUS_JAR_PREFIX, JarLocation.PLUGINS, JarLocation.LIB);
 				},
+				REFLECTION),
+			TestParams.from("Jar in 'lib' and 'plugins'",
+				builder -> {
+					builder.copyJar(PROMETHEUS_JAR_PREFIX, JarLocation.PLUGINS, JarLocation.LIB);
+				},
 				FACTORY)
 		);
 	}
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporterFactory.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporterFactory.java
index d4e2e86..a7dce94 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporterFactory.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporterFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.metrics.datadog;
 
+import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
 
@@ -26,6 +27,7 @@ import java.util.Properties;
 /**
  * {@link MetricReporterFactory} for {@link DatadogHttpReporter}.
  */
+@InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.metrics.datadog.DatadogHttpReporter")
 public class DatadogHttpReporterFactory implements MetricReporterFactory {
 
 	@Override
diff --git a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporterFactory.java b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporterFactory.java
index b8357d9..bda681d 100644
--- a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporterFactory.java
+++ b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporterFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.metrics.graphite;
 
+import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
 
@@ -26,6 +27,7 @@ import java.util.Properties;
 /**
  * {@link MetricReporterFactory} for {@link GraphiteReporter}.
  */
+@InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.metrics.graphite.GraphiteReporter")
 public class GraphiteReporterFactory implements MetricReporterFactory {
 
 	@Override
diff --git a/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/InfluxdbReporterFactory.java b/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/InfluxdbReporterFactory.java
index 7659362..c272a6f 100644
--- a/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/InfluxdbReporterFactory.java
+++ b/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/InfluxdbReporterFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.metrics.influxdb;
 
+import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
 
@@ -26,6 +27,7 @@ import java.util.Properties;
 /**
  * {@link MetricReporterFactory} for {@link InfluxdbReporter}.
  */
+@InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.metrics.influxdb.InfluxdbReporter")
 public class InfluxdbReporterFactory implements MetricReporterFactory {
 
 	@Override
diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporterFactory.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporterFactory.java
index 128fb6b..31d9110 100644
--- a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporterFactory.java
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporterFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.metrics.jmx;
 
+import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
 
 import java.util.Properties;
@@ -24,6 +25,7 @@ import java.util.Properties;
 /**
  * {@link MetricReporterFactory} for {@link JMXReporter}.
  */
+@InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.metrics.jmx.JMXReporter")
 public class JMXReporterFactory implements MetricReporterFactory {
 
 	static final String ARG_PORT = "port";
diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterFactory.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterFactory.java
index 7b7372e..176ce17 100644
--- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterFactory.java
+++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.metrics.prometheus;
 
+import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
 
 import java.util.Properties;
@@ -24,6 +25,7 @@ import java.util.Properties;
 /**
  * {@link MetricReporterFactory} for {@link PrometheusPushGatewayReporter}.
  */
+@InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter")
 public class PrometheusPushGatewayReporterFactory implements MetricReporterFactory {
 
 	@Override
diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporterFactory.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporterFactory.java
index a1ad365..fecc755 100644
--- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporterFactory.java
+++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporterFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.metrics.prometheus;
 
+import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
 
 import java.util.Properties;
@@ -24,6 +25,7 @@ import java.util.Properties;
 /**
  * {@link MetricReporterFactory} for {@link PrometheusReporter}.
  */
+@InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.metrics.prometheus.PrometheusReporter")
 public class PrometheusReporterFactory implements MetricReporterFactory {
 
 	@Override
diff --git a/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/metrics/slf4j/Slf4jReporterFactory.java b/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/metrics/slf4j/Slf4jReporterFactory.java
index 2acb236..2b2a9f6 100644
--- a/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/metrics/slf4j/Slf4jReporterFactory.java
+++ b/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/metrics/slf4j/Slf4jReporterFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.metrics.slf4j;
 
+import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
 
@@ -25,6 +26,7 @@ import java.util.Properties;
 /**
  * {@link MetricReporterFactory} for {@link Slf4jReporter}.
  */
+@InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.metrics.slf4j.Slf4jReporter")
 public class Slf4jReporterFactory implements MetricReporterFactory {
 
 	@Override
diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporterFactory.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporterFactory.java
index e409ce3..cda9504 100644
--- a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporterFactory.java
+++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporterFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.metrics.statsd;
 
+import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
 
@@ -26,6 +27,7 @@ import java.util.Properties;
 /**
  * A {@link MetricReporterFactory} implementation that creates a {@link StatsDReporter} instance.
  */
+@InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.metrics.statsd.StatsDReporter")
 public class StatsDReporterFactory implements MetricReporterFactory {
 
 	@Override


[flink] 01/02: [FLINK-18435][metrics] Add support for intercepting reflection-based instantiations

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1732b71b045b22375376864c5a8fab998dbaa428
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Jun 26 14:39:37 2020 +0200

    [FLINK-18435][metrics] Add support for intercepting reflection-based instantiations
---
 .../metrics/reporter/InstantiateViaFactory.java    |  3 ++
 .../InterceptInstantiationViaReflection.java       | 37 ++++++++++++++++++++++
 .../flink/runtime/metrics/ReporterSetup.java       | 27 +++++++++++++---
 .../flink/runtime/metrics/ReporterSetupTest.java   | 35 ++++++++++++++++++++
 4 files changed, 98 insertions(+), 4 deletions(-)

diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java
index 454ec91..a5c6ed1 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java
@@ -28,6 +28,9 @@ import java.lang.annotation.Target;
  * backwards-compatibility with existing reflection-based configurations.
  *
  * <p>When an annotated reporter is configured to be used via reflection the given factory will be used instead.
+ *
+ * <p>Attention: This annotation does not work if the reporter is loaded as a plugin. For these cases, annotate the
+ * factory with {@link InterceptInstantiationViaReflection} instead.
  */
 @Retention(RetentionPolicy.RUNTIME)
 @Target(ElementType.TYPE)
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java
new file mode 100644
index 0000000..6a6f05b
--- /dev/null
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.reporter;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for {@link MetricReporterFactory MetricReporterFactories} that want to maintain
+ * backwards-compatibility with existing reflection-based configurations.
+ *
+ * <p>When a reporter is configured to be used via reflection the annotated factory will be used instead.
+ *
+ * @see InstantiateViaFactory
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface InterceptInstantiationViaReflection {
+	String reporterClassName();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
index da15079..0427918 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.InstantiateViaFactory;
+import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
@@ -259,6 +260,17 @@ public final class ReporterSetup {
 		}
 
 		if (reporterClassName != null) {
+			final Optional<MetricReporterFactory> interceptingFactory = reporterFactories.values().stream()
+				.filter(factory -> {
+					InterceptInstantiationViaReflection annotation = factory.getClass().getAnnotation(InterceptInstantiationViaReflection.class);
+					return annotation != null && annotation.reporterClassName().equals(reporterClassName);
+				})
+				.findAny();
+
+			if (interceptingFactory.isPresent()) {
+				return loadViaFactory(reporterConfig, interceptingFactory.get());
+			}
+
 			return loadViaReflection(reporterClassName, reporterName, reporterConfig, reporterFactories);
 		}
 
@@ -278,13 +290,20 @@ public final class ReporterSetup {
 			LOG.warn("The reporter factory ({}) could not be found for reporter {}. Available factories: {}.", factoryClassName, reporterName, reporterFactories.keySet());
 			return Optional.empty();
 		} else {
-			final MetricConfig metricConfig = new MetricConfig();
-			reporterConfig.addAllToProperties(metricConfig);
-
-			return Optional.of(factory.createMetricReporter(metricConfig));
+			return loadViaFactory(reporterConfig, factory);
 		}
 	}
 
+	private static Optional<MetricReporter> loadViaFactory(
+		final Configuration reporterConfig,
+		final MetricReporterFactory factory) {
+
+		final MetricConfig metricConfig = new MetricConfig();
+		reporterConfig.addAllToProperties(metricConfig);
+
+		return Optional.of(factory.createMetricReporter(metricConfig));
+	}
+
 	private static Optional<MetricReporter> loadViaReflection(
 			final String reporterClassName,
 			final String reporterName,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
index f687c66..728f783 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
@@ -21,8 +21,10 @@ package org.apache.flink.runtime.metrics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.plugin.TestingPluginManager;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.InstantiateViaFactory;
+import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
@@ -32,6 +34,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
@@ -342,6 +345,26 @@ public class ReporterSetupTest extends TestLogger {
 	}
 
 	/**
+	 * Verifies that the factory approach is used if the factory is annotated with {@link org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection}.
+	 */
+	@Test
+	public void testReflectionInterception() {
+		final Configuration config = new Configuration();
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, InstantiationTypeTrackingTestReporter.class.getName());
+
+		final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, new TestingPluginManager(Collections.singletonMap(
+			MetricReporterFactory.class,
+			Collections.singletonList(new InterceptingInstantiationTypeTrackingTestReporterFactory()).iterator())));
+
+		assertEquals(1, reporterSetups.size());
+
+		final ReporterSetup reporterSetup = reporterSetups.get(0);
+		final InstantiationTypeTrackingTestReporter metricReporter = (InstantiationTypeTrackingTestReporter) reporterSetup.getReporter();
+
+		assertTrue(metricReporter.createdByFactory);
+	}
+
+	/**
 	 * Factory that exposed the last provided metric config.
 	 */
 	public static class ConfigExposingReporterFactory implements MetricReporterFactory {
@@ -391,6 +414,18 @@ public class ReporterSetupTest extends TestLogger {
 	}
 
 	/**
+	 * Factory for {@link InstantiationTypeTrackingTestReporter} that intercepts reflection-based instantiation attempts.
+	 */
+	@InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporter")
+	public static class InterceptingInstantiationTypeTrackingTestReporterFactory implements MetricReporterFactory {
+
+		@Override
+		public MetricReporter createMetricReporter(Properties config) {
+			return new InstantiationTypeTrackingTestReporter(true);
+		}
+	}
+
+	/**
 	 * Reporter that exposes which constructor was called.
 	 */
 	protected static class InstantiationTypeTrackingTestReporter extends TestReporter {