You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/12 19:27:32 UTC
[05/10] flink git commit: [FLINK-8080][metrics] metrics.reporters now
optional include list
[FLINK-8080][metrics] metrics.reporters now optional include list
This closes #5099.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/493c2857
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/493c2857
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/493c2857
Branch: refs/heads/master
Commit: 493c28571f22d9dde4edbec0ba38f2761fb51335
Parents: 42b0114
Author: zentol <ch...@apache.org>
Authored: Wed Nov 15 12:56:30 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Dec 12 19:09:15 2017 +0100
----------------------------------------------------------------------
docs/monitoring/metrics.md | 9 +---
.../connectors/kafka/KafkaTestBase.java | 2 -
.../flink/configuration/MetricOptions.java | 8 ++--
.../ScheduledDropwizardReporterTest.java | 1 -
.../DropwizardFlinkHistogramWrapperTest.java | 2 -
.../flink/metrics/jmx/JMXReporterTest.java | 6 ---
.../jobmanager/JMXJobManagerMetricTest.java | 1 -
.../prometheus/PrometheusReporterTest.java | 2 -
.../flink/metrics/slf4j/Slf4jReporterTest.java | 1 -
.../metrics/statsd/StatsDReporterTest.java | 3 --
.../metrics/MetricRegistryConfiguration.java | 45 +++++++++++++++++---
.../runtime/metrics/MetricRegistryImpl.java | 2 +-
.../runtime/metrics/MetricRegistryImplTest.java | 13 ++----
.../metrics/groups/AbstractMetricGroupTest.java | 1 -
.../groups/MetricGroupRegistrationTest.java | 2 -
15 files changed, 49 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index e4b5161..2a9ab50 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -560,11 +560,11 @@ counter = getRuntimeContext()
Metrics can be exposed to an external system by configuring one or several reporters in `conf/flink-conf.yaml`. These
reporters will be instantiated on each job and task manager when they are started.
-- `metrics.reporters`: The list of named reporters.
- `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the reporter named `<name>`.
- `metrics.reporter.<name>.class`: The reporter class to use for the reporter named `<name>`.
- `metrics.reporter.<name>.interval`: The reporter interval to use for the reporter named `<name>`.
- `metrics.reporter.<name>.scope.delimiter`: The delimiter to use for the identifier (default value use `metrics.scope.delimiter`) for the reporter named `<name>`.
+- `metrics.reporters`: (optional) A comma-separated include list of reporter names. By default all configured reporters will be used.
All reporters must at least have the `class` property, some allow specifying a reporting `interval`. Below,
we will list more settings specific to each reporter.
@@ -606,7 +606,6 @@ Example configuration:
{% highlight yaml %}
-metrics.reporters: jmx
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789
@@ -642,7 +641,6 @@ Example configuration:
{% highlight yaml %}
-metrics.reporters: gang
metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
metrics.reporter.gang.host: localhost
metrics.reporter.gang.port: 8649
@@ -668,7 +666,6 @@ Example configuration:
{% highlight yaml %}
-metrics.reporters: grph
metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
@@ -689,7 +686,6 @@ Example configuration:
{% highlight yaml %}
-metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
{% endhighlight %}
@@ -719,7 +715,6 @@ Example configuration:
{% highlight yaml %}
-metrics.reporters: stsd
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125
@@ -743,7 +738,6 @@ Example configuration:
{% highlight yaml %}
-metrics.reporters: dghttp
metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: xxx
metrics.reporter.dghttp.tags: myflinkapp,prod
@@ -760,7 +754,6 @@ Example configuration:
{% highlight yaml %}
-metrics.reporters: slf4j
metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 60 SECONDS
http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 74485b4..f471cd4 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -21,7 +21,6 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.client.JobExecutionException;
@@ -136,7 +135,6 @@ public abstract class KafkaTestBase extends TestLogger {
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
- flinkConfig.setString(MetricOptions.REPORTERS_LIST, "my_reporter");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
return flinkConfig;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 8a328ac..42eb575 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -25,12 +25,12 @@ import static org.apache.flink.configuration.ConfigOptions.key;
public class MetricOptions {
/**
- * The list of named reporters. Names are defined here and per-reporter configs
- * are given with the reporter config prefix and the reporter name.
+ * An optional list of reporter names. If configured, only reporters whose name matches any of the names in the list
+ * will be started. Otherwise, all reporters that could be found in the configuration will be started.
*
- * Example:
+ * <p>Example:
* <pre>{@code
- * metrics.reporters = foo, bar
+ * metrics.reporters = foo,bar
*
* metrics.reporter.foo.class = org.apache.flink.metrics.reporter.JMXReporter
* metrics.reporter.foo.interval = 10
http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 3fa0474..4a2ca3a 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -82,7 +82,6 @@ public class ScheduledDropwizardReporterTest {
String taskManagerId = "tas:kMana::ger";
String counterName = "testCounter";
- configuration.setString(MetricOptions.REPORTERS_LIST, "test");
configuration.setString(
ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
"org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");
http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index 8f70abb..fb21a75b 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.dropwizard.metrics;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.MetricReporter;
@@ -101,7 +100,6 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
int size = 10;
String histogramMetricName = "histogram";
Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "my_reporter");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, reportingInterval + " MILLISECONDS");
http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 067b08f..40b7f15 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.metrics.jmx;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.util.TestHistogram;
@@ -100,7 +99,6 @@ public class JMXReporterTest extends TestLogger {
@Test
public void testPortConflictHandling() throws Exception {
Configuration cfg = new Configuration();
- cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9020-9035");
@@ -160,8 +158,6 @@ public class JMXReporterTest extends TestLogger {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
- cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
-
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9040-9055");
@@ -236,7 +232,6 @@ public class JMXReporterTest extends TestLogger {
try {
Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "jmx_test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
@@ -286,7 +281,6 @@ public class JMXReporterTest extends TestLogger {
try {
Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "jmx_test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 7280476..c74db28 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -64,7 +64,6 @@ public class JMXJobManagerMetricTest {
Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
Configuration flinkConfiguration = new Configuration();
- flinkConfiguration.setString(MetricOptions.REPORTERS_LIST, "test");
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "9060-9075");
http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index d905c25..39cf4be 100644
--- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.metrics.prometheus;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
@@ -279,7 +278,6 @@ public class PrometheusReporterTest extends TestLogger {
static Configuration createConfigWithOneReporter(String reporterName, String portString) {
Configuration cfg = new Configuration();
- cfg.setString(MetricOptions.REPORTERS_LIST, reporterName);
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + reporterName + "." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + reporterName + "." + ARG_PORT, portString);
return cfg;
http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
index f538bc7..b344f45 100644
--- a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
+++ b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
@@ -63,7 +63,6 @@ public class Slf4jReporterTest extends TestLogger {
TestUtils.addTestAppenderForRootLogger();
Configuration configuration = new Configuration();
- configuration.setString(MetricOptions.REPORTERS_LIST, "slf4j");
configuration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "slf4j." +
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, Slf4jReporter.class.getName());
configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 275f2e1..08d4998 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -81,7 +81,6 @@ public class StatsDReporterTest extends TestLogger {
String taskManagerId = "tas:kMana::ger";
String counterName = "testCounter";
- configuration.setString(MetricOptions.REPORTERS_LIST, "test");
configuration.setString(
ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
"org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter");
@@ -151,7 +150,6 @@ public class StatsDReporterTest extends TestLogger {
int port = receiver.getPort();
Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost");
@@ -225,7 +223,6 @@ public class StatsDReporterTest extends TestLogger {
int port = receiver.getPort();
Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost");
http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index d07cb65..7188a59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -32,7 +32,11 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* Configuration object for {@link MetricRegistryImpl}.
@@ -44,7 +48,14 @@ public class MetricRegistryConfiguration {
private static volatile MetricRegistryConfiguration defaultConfiguration;
// regex pattern to split the defined reporters
- private static final Pattern splitPattern = Pattern.compile("\\s*,\\s*");
+ private static final Pattern reporterListPattern = Pattern.compile("\\s*,\\s*");
+
+ // regex pattern to extract the name from reporter configuration keys, e.g. "rep" from "metrics.reporter.rep.class"
+ private static final Pattern reporterClassPattern = Pattern.compile(
+ Pattern.quote(ConfigConstants.METRICS_REPORTER_PREFIX) +
+ // [\S&&[^.]] = intersection of non-whitespace and non-period character classes
+ "([\\S&&[^.]]*)\\." +
+ Pattern.quote(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX));
// scope formats for the different components
private final ScopeFormats scopeFormats;
@@ -108,15 +119,37 @@ public class MetricRegistryConfiguration {
delim = '.';
}
- final String definedReporters = configuration.getString(MetricOptions.REPORTERS_LIST);
+ String includedReportersString = configuration.getString(MetricOptions.REPORTERS_LIST, "");
+ Set<String> includedReporters = reporterListPattern.splitAsStream(includedReportersString)
+ .collect(Collectors.toSet());
+
+ // use a TreeSet to make the reporter order deterministic, which is useful for testing
+ Set<String> namedReporters = new TreeSet<>(String::compareTo);
+ // scan entire configuration for "metric.reporter" keys and parse individual reporter configurations
+ for (String key : configuration.keySet()) {
+ if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+ Matcher matcher = reporterClassPattern.matcher(key);
+ if (matcher.matches()) {
+ String reporterName = matcher.group(1);
+ if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) {
+ if (namedReporters.contains(reporterName)) {
+ LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName);
+ } else {
+ namedReporters.add(reporterName);
+ }
+ } else {
+ LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString);
+ }
+ }
+ }
+ }
+
List<Tuple2<String, Configuration>> reporterConfigurations;
- if (definedReporters == null) {
+ if (namedReporters.isEmpty()) {
reporterConfigurations = Collections.emptyList();
} else {
- String[] namedReporters = splitPattern.split(definedReporters);
-
- reporterConfigurations = new ArrayList<>(namedReporters.length);
+ reporterConfigurations = new ArrayList<>(namedReporters.size());
for (String namedReporter: namedReporters) {
DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration(
http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
index d36d095..c8f4490 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -137,7 +137,7 @@ public class MetricRegistryImpl implements MetricRegistry {
MetricConfig metricConfig = new MetricConfig();
reporterConfig.addAllToProperties(metricConfig);
- LOG.info("Configuring {} with {}.", reporterClass.getSimpleName(), metricConfig);
+ LOG.info("Configuring {} with {}.", namedReporter, metricConfig);
reporterInstance.open(metricConfig);
if (reporterInstance instanceof Scheduled) {
http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
index b0b20b2..2eccc0c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
@@ -70,20 +70,22 @@ public class MetricRegistryImplTest extends TestLogger {
}
/**
- * Verifies that the reporter class argument is correctly used to instantiate and open the reporter.
+ * Verifies that the reporter name list is correctly used to determine which reporters should be instantiated.
*/
@Test
- public void testReporterInstantiation() {
+ public void testReporterInclusion() {
Configuration config = new Configuration();
config.setString(MetricOptions.REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName());
MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
assertTrue(metricRegistry.getReporters().size() == 1);
Assert.assertTrue(TestReporter1.wasOpened);
+ Assert.assertFalse(TestReporter11.wasOpened);
metricRegistry.shutdown();
}
@@ -107,7 +109,6 @@ public class MetricRegistryImplTest extends TestLogger {
public void testMultipleReporterInstantiation() {
Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "test1, test2,test3");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName());
@@ -166,7 +167,6 @@ public class MetricRegistryImplTest extends TestLogger {
public void testReporterArgumentForwarding() {
Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world");
@@ -197,7 +197,6 @@ public class MetricRegistryImplTest extends TestLogger {
public void testReporterScheduling() throws InterruptedException {
Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
@@ -247,7 +246,6 @@ public class MetricRegistryImplTest extends TestLogger {
@Test
public void testReporterNotifications() {
Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
@@ -356,7 +354,6 @@ public class MetricRegistryImplTest extends TestLogger {
@Test
public void testConfigurableDelimiterForReporters() {
Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
@@ -379,7 +376,6 @@ public class MetricRegistryImplTest extends TestLogger {
@Test
public void testConfigurableDelimiterForReportersInGroup() {
Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3,test4");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
@@ -459,7 +455,6 @@ public class MetricRegistryImplTest extends TestLogger {
public void testExceptionIsolation() throws Exception {
Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, FailingReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index 8d91b81..325982b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -84,7 +84,6 @@ public class AbstractMetricGroupTest {
public void testScopeCachingForMultipleReporters() throws Exception {
Configuration config = new Configuration();
config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
- config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index 324bb73..bcdcd63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
@@ -47,7 +46,6 @@ public class MetricGroupRegistrationTest extends TestLogger {
@Test
public void testMetricInstantiation() {
Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));