You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/30 13:53:28 UTC

[GitHub] [flink] metaswirl commented on a change in pull request #19263: [FLINK-21585][metrics] Add options for in-/excluding metrics

metaswirl commented on a change in pull request #19263:
URL: https://github.com/apache/flink/pull/19263#discussion_r838439605



##########
File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
##########
@@ -117,8 +119,88 @@
                     .withDescription(
                             "The set of variables that should be excluded for the reporter named <name>. Only applicable to tag-based reporters (e.g., PRometheus, InfluxDB).");
 
+    private static final Description.DescriptionBuilder addFilterDescription(
+            Description.DescriptionBuilder description) {
+        return description
+                .text("Filters are specified as a list, with each filter following this format:")
+                .linebreak()
+                .text("%s", code("<scope>[:<name>[;<name#N>][:type[;type#N]]]"))

Review comment:
       I find `<name#N>` non-intuitive and it is also not explained or used.
   ```suggestion
                   .text("%s", code("<scope>[:<name>[;<name>][:type[;type]]]"))
   ```

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
##########
@@ -117,8 +119,88 @@
                     .withDescription(
                             "The set of variables that should be excluded for the reporter named <name>. Only applicable to tag-based reporters (e.g., PRometheus, InfluxDB).");
 
+    private static final Description.DescriptionBuilder addFilterDescription(
+            Description.DescriptionBuilder description) {
+        return description
+                .text("Filters are specified as a list, with each filter following this format:")
+                .linebreak()
+                .text("%s", code("<scope>[:<name>[;<name#N>][:type[;type#N]]]"))
+                .list(
+                        text(
+                                "scope: Filters based on the logical scope.%s"
+                                        + "Specified as a pattern where %s matches one or more characters and %s separates scope components.%s"
+                                        + "For example:%s"
+                                        + " \"%s\" matches any job-related metrics on the JobManager,%s"
+                                        + " \"%s\" matches all job-related metrics and%s"
+                                        + " \"%s\" matches all metrics below the job-level (i.e., task/operator metrics etc.).",
+                                linebreak(),
+                                code("*"),
+                                code("."),
+                                linebreak(),
+                                linebreak(),
+                                code("jobmanager.job"),
+                                linebreak(),
+                                code("*.job"),
+                                linebreak(),
+                                code("*.job.*")),
+                        text(
+                                "name: Filters based on the metric name.%s"
+                                        + "Specified as a pattern where %s matches any sequence of characters.%s"

Review comment:
       Same as above.
   ```suggestion
                                           + "Specified as a pattern where %s matches any sequence of characters. "
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilter.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.filter;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link MetricFilter} implementation that filters metrics based on {@link
+ * MetricOptions#REPORTER_INCLUDES}/{@link MetricOptions#REPORTER_EXCLUDES}.
+ */
+public class DefaultMetricFilter implements MetricFilter {
+
+    private static final EnumSet<MetricType> ALL_METRIC_TYPES = EnumSet.allOf(MetricType.class);
+    private static final EnumSet<MetricType> NO_METRIC_TYPES = EnumSet.noneOf(MetricType.class);

Review comment:
       Does not appear to be used.
   
   ```suggestion
   ```

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
##########
@@ -117,8 +119,88 @@
                     .withDescription(
                             "The set of variables that should be excluded for the reporter named <name>. Only applicable to tag-based reporters (e.g., PRometheus, InfluxDB).");
 
+    private static final Description.DescriptionBuilder addFilterDescription(
+            Description.DescriptionBuilder description) {
+        return description
+                .text("Filters are specified as a list, with each filter following this format:")
+                .linebreak()
+                .text("%s", code("<scope>[:<name>[;<name#N>][:type[;type#N]]]"))
+                .list(
+                        text(
+                                "scope: Filters based on the logical scope.%s"
+                                        + "Specified as a pattern where %s matches one or more characters and %s separates scope components.%s"
+                                        + "For example:%s"
+                                        + " \"%s\" matches any job-related metrics on the JobManager,%s"
+                                        + " \"%s\" matches all job-related metrics and%s"
+                                        + " \"%s\" matches all metrics below the job-level (i.e., task/operator metrics etc.).",
+                                linebreak(),
+                                code("*"),
+                                code("."),
+                                linebreak(),
+                                linebreak(),
+                                code("jobmanager.job"),
+                                linebreak(),
+                                code("*.job"),
+                                linebreak(),
+                                code("*.job.*")),
+                        text(
+                                "name: Filters based on the metric name.%s"
+                                        + "Specified as a pattern where %s matches any sequence of characters.%s"
+                                        + "For example, \"%s\" matches any metrics where the name contains %s.",
+                                linebreak(),
+                                code("*"),
+                                linebreak(),
+                                code("*Records*"),
+                                code("Records")),
+                        text(
+                                "type: Filters based on the metric type. Valid types are: %s",
+                                code("[counter, meter, gauge, histogram]")))
+                .text("Examples:")
+                .list(
+                        text(
+                                "\"%s\" Matches metrics like %s.",
+                                code("*:numRecords*"), code("numRecordsIn")),
+                        text(
+                                "\"%s\" Matches metrics like %s on the operator level.",
+                                code("*.job.task.operator:numRecords*"), code("numRecordsIn")),
+                        text(
+                                "\"%s\" Matches meter metrics like %s on the operator level.",
+                                code("*.job.task.operator:numRecords*:meter"),
+                                code("numRecordsInPerSecond")));
+    }
+
     @Documentation.SuffixOption(NAMED_REPORTER_CONFIG_PREFIX)
     @Documentation.Section(value = Documentation.Sections.METRIC_REPORTERS, position = 4)
+    public static final ConfigOption<List<String>> REPORTER_INCLUDES =
+            key("filter.includes")
+                    .stringType()
+                    .asList()
+                    .defaultValues("*:*:*")
+                    .withDescription(
+                            addFilterDescription(
+                                            Description.builder()
+                                                    .text(
+                                                            "The metrics that should be included for the reporter named <name>.")
+                                                    .linebreak())

Review comment:
       Linebreak seems unnecessary
   
   ```suggestion
                                                               "The metrics that should be included for the reporter named <name>."))
   ```

##########
File path: flink-end-to-end-tests/test-scripts/common.sh
##########
@@ -630,7 +630,8 @@ function kill_random_taskmanager {
 function setup_flink_slf4j_metric_reporter() {
   INTERVAL="${1:-1 SECONDS}"
   set_config_key "metrics.reporter.slf4j.factory.class" "org.apache.flink.metrics.slf4j.Slf4jReporterFactory"
-  set_config_key "metrics.reporter.slf4j.interval" "${INTERVAL}"
+  set_config_key "metrics.reporter.slf4j.interval" "1 SECONDS"
+  set_config_key "metrics.reporter.slf4j.filter.includes" "*:${1}"

Review comment:
       I would rather replace `$1` by `$METRIC_NAME_PATTERN` or similar.

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
##########
@@ -117,8 +119,88 @@
                     .withDescription(
                             "The set of variables that should be excluded for the reporter named <name>. Only applicable to tag-based reporters (e.g., PRometheus, InfluxDB).");
 
+    private static final Description.DescriptionBuilder addFilterDescription(
+            Description.DescriptionBuilder description) {
+        return description
+                .text("Filters are specified as a list, with each filter following this format:")
+                .linebreak()
+                .text("%s", code("<scope>[:<name>[;<name#N>][:type[;type#N]]]"))

Review comment:
       On a more general note, do we really want to have multiple options?
   
   We would allow something like below. It's great that it's powerful, but configurations like this would be more difficult to maintain. This is fine for the type part, but I would not allow it for the name part.
   
   `"*.job.task.operator:numRecords*;debloatedBufferSize;mailboxLatency*:meter;gauge`

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
##########
@@ -117,8 +119,88 @@
                     .withDescription(
                             "The set of variables that should be excluded for the reporter named <name>. Only applicable to tag-based reporters (e.g., PRometheus, InfluxDB).");
 
+    private static final Description.DescriptionBuilder addFilterDescription(
+            Description.DescriptionBuilder description) {
+        return description
+                .text("Filters are specified as a list, with each filter following this format:")
+                .linebreak()
+                .text("%s", code("<scope>[:<name>[;<name#N>][:type[;type#N]]]"))
+                .list(
+                        text(
+                                "scope: Filters based on the logical scope.%s"
+                                        + "Specified as a pattern where %s matches one or more characters and %s separates scope components.%s"
+                                        + "For example:%s"
+                                        + " \"%s\" matches any job-related metrics on the JobManager,%s"
+                                        + " \"%s\" matches all job-related metrics and%s"
+                                        + " \"%s\" matches all metrics below the job-level (i.e., task/operator metrics etc.).",
+                                linebreak(),
+                                code("*"),
+                                code("."),
+                                linebreak(),
+                                linebreak(),
+                                code("jobmanager.job"),
+                                linebreak(),
+                                code("*.job"),
+                                linebreak(),
+                                code("*.job.*")),
+                        text(
+                                "name: Filters based on the metric name.%s"
+                                        + "Specified as a pattern where %s matches any sequence of characters.%s"
+                                        + "For example, \"%s\" matches any metrics where the name contains %s.",
+                                linebreak(),
+                                code("*"),
+                                linebreak(),
+                                code("*Records*"),
+                                code("Records")),
+                        text(
+                                "type: Filters based on the metric type. Valid types are: %s",
+                                code("[counter, meter, gauge, histogram]")))
+                .text("Examples:")
+                .list(
+                        text(
+                                "\"%s\" Matches metrics like %s.",
+                                code("*:numRecords*"), code("numRecordsIn")),
+                        text(
+                                "\"%s\" Matches metrics like %s on the operator level.",
+                                code("*.job.task.operator:numRecords*"), code("numRecordsIn")),
+                        text(
+                                "\"%s\" Matches meter metrics like %s on the operator level.",
+                                code("*.job.task.operator:numRecords*:meter"),
+                                code("numRecordsInPerSecond")));
+    }
+
     @Documentation.SuffixOption(NAMED_REPORTER_CONFIG_PREFIX)
     @Documentation.Section(value = Documentation.Sections.METRIC_REPORTERS, position = 4)
+    public static final ConfigOption<List<String>> REPORTER_INCLUDES =
+            key("filter.includes")
+                    .stringType()
+                    .asList()
+                    .defaultValues("*:*:*")
+                    .withDescription(
+                            addFilterDescription(
+                                            Description.builder()
+                                                    .text(
+                                                            "The metrics that should be included for the reporter named <name>.")
+                                                    .linebreak())
+                                    .build());
+
+    @Documentation.SuffixOption(NAMED_REPORTER_CONFIG_PREFIX)
+    @Documentation.Section(value = Documentation.Sections.METRIC_REPORTERS, position = 5)
+    public static final ConfigOption<List<String>> REPORTER_EXCLUDES =
+            key("filter.excludes")
+                    .stringType()
+                    .asList()
+                    .defaultValues()
+                    .withDescription(
+                            addFilterDescription(

Review comment:
       Do we really want this twice in our documentation? Can we not instead refer to the previous section?

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
##########
@@ -117,8 +119,88 @@
                     .withDescription(
                             "The set of variables that should be excluded for the reporter named <name>. Only applicable to tag-based reporters (e.g., PRometheus, InfluxDB).");
 
+    private static final Description.DescriptionBuilder addFilterDescription(
+            Description.DescriptionBuilder description) {
+        return description
+                .text("Filters are specified as a list, with each filter following this format:")
+                .linebreak()
+                .text("%s", code("<scope>[:<name>[;<name#N>][:type[;type#N]]]"))
+                .list(
+                        text(
+                                "scope: Filters based on the logical scope.%s"
+                                        + "Specified as a pattern where %s matches one or more characters and %s separates scope components.%s"

Review comment:
       Visually, I would use either no or two line breaks.
   ```suggestion
                                           + "Specified as a pattern where %s matches one or more characters and %s separates scope components. "
   ```

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
##########
@@ -117,8 +119,88 @@
                     .withDescription(
                             "The set of variables that should be excluded for the reporter named <name>. Only applicable to tag-based reporters (e.g., PRometheus, InfluxDB).");
 
+    private static final Description.DescriptionBuilder addFilterDescription(
+            Description.DescriptionBuilder description) {
+        return description
+                .text("Filters are specified as a list, with each filter following this format:")
+                .linebreak()
+                .text("%s", code("<scope>[:<name>[;<name#N>][:type[;type#N]]]"))
+                .list(
+                        text(
+                                "scope: Filters based on the logical scope.%s"
+                                        + "Specified as a pattern where %s matches one or more characters and %s separates scope components.%s"
+                                        + "For example:%s"
+                                        + " \"%s\" matches any job-related metrics on the JobManager,%s"
+                                        + " \"%s\" matches all job-related metrics and%s"
+                                        + " \"%s\" matches all metrics below the job-level (i.e., task/operator metrics etc.).",
+                                linebreak(),
+                                code("*"),
+                                code("."),
+                                linebreak(),
+                                linebreak(),
+                                code("jobmanager.job"),
+                                linebreak(),
+                                code("*.job"),
+                                linebreak(),
+                                code("*.job.*")),
+                        text(
+                                "name: Filters based on the metric name.%s"
+                                        + "Specified as a pattern where %s matches any sequence of characters.%s"
+                                        + "For example, \"%s\" matches any metrics where the name contains %s.",
+                                linebreak(),
+                                code("*"),
+                                linebreak(),
+                                code("*Records*"),
+                                code("Records")),
+                        text(
+                                "type: Filters based on the metric type. Valid types are: %s",
+                                code("[counter, meter, gauge, histogram]")))
+                .text("Examples:")
+                .list(
+                        text(
+                                "\"%s\" Matches metrics like %s.",
+                                code("*:numRecords*"), code("numRecordsIn")),
+                        text(
+                                "\"%s\" Matches metrics like %s on the operator level.",
+                                code("*.job.task.operator:numRecords*"), code("numRecordsIn")),
+                        text(
+                                "\"%s\" Matches meter metrics like %s on the operator level.",
+                                code("*.job.task.operator:numRecords*:meter"),
+                                code("numRecordsInPerSecond")));

Review comment:
       Maybe we could give an example for multiple options as well?
   
   ```suggestion
                           text(
                                   "\"%s\" Matches counter metrics like %s and meter metrics like %s on the operator level.",
                                   code("*.job.task.operator:numRecords*:meter;counter"),
                                   code("numRecordsIn"),
                                   code("numRecordsInPerSecond")));
   ```

##########
File path: flink-end-to-end-tests/test-scripts/common.sh
##########
@@ -630,7 +630,8 @@ function kill_random_taskmanager {
 function setup_flink_slf4j_metric_reporter() {
   INTERVAL="${1:-1 SECONDS}"

Review comment:
       It looks like we don't need this anymore?
   
   ```suggestion
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
##########
@@ -380,6 +382,12 @@ public void register(Metric metric, String metricName, AbstractMetricGroup group
                 LOG.warn(
                         "Cannot register metric, because the MetricRegistry has already been shut down.");
             } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                            "Registering metric {}.{}.",
+                            group.getLogicalScope(CharacterFilter.NO_OP_FILTER),
+                            metricName);
+                }

Review comment:
       Why do we need to check for `isDebugEnabled` here? Do you think that the call `getLogicalScope` is to expensive?
   
   ```suggestion
                   LOG.debug(
                           "Registering metric {}.{}.",
                           group.getLogicalScope(CharacterFilter.NO_OP_FILTER),
                           metricName);
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilter.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.filter;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link MetricFilter} implementation that filters metrics based on {@link
+ * MetricOptions#REPORTER_INCLUDES}/{@link MetricOptions#REPORTER_EXCLUDES}.
+ */
+public class DefaultMetricFilter implements MetricFilter {
+
+    private static final EnumSet<MetricType> ALL_METRIC_TYPES = EnumSet.allOf(MetricType.class);
+    private static final EnumSet<MetricType> NO_METRIC_TYPES = EnumSet.noneOf(MetricType.class);
+
+    private final List<FilterSpec> includes;
+    private final List<FilterSpec> excludes;
+
+    private DefaultMetricFilter(List<FilterSpec> includes, List<FilterSpec> excludes) {
+        this.includes = includes;
+        this.excludes = excludes;
+    }
+
+    @Override
+    public boolean filter(Metric metric, String name, String logicalScope) {
+        for (FilterSpec exclude : excludes) {
+            if (exclude.namePattern.matcher(name).matches()
+                    && exclude.scopePattern.matcher(logicalScope).matches()
+                    && exclude.types.contains(metric.getMetricType())) {
+                return false;
+            }
+        }
+        for (FilterSpec include : includes) {
+            if (include.namePattern.matcher(name).matches()
+                    && include.scopePattern.matcher(logicalScope).matches()
+                    && include.types.contains(metric.getMetricType())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static MetricFilter fromConfiguration(Configuration configuration) {
+        final List<String> includes = configuration.get(MetricOptions.REPORTER_INCLUDES);
+        final List<String> excludes = configuration.get(MetricOptions.REPORTER_EXCLUDES);
+
+        final List<FilterSpec> includeFilters =
+                includes.stream().map(i -> parse(i)).collect(Collectors.toList());
+        final List<FilterSpec> excludeFilters =
+                excludes.stream().map(e -> parse(e)).collect(Collectors.toList());
+
+        return new DefaultMetricFilter(includeFilters, excludeFilters);
+    }
+
+    private static FilterSpec parse(String filter) {
+        final String[] split = filter.split(":");
+        final Pattern scope = convertToPattern(split[0]);
+        final Pattern name = split.length > 1 ? convertToPattern(split[1]) : Pattern.compile(".+");
+        final EnumSet<MetricType> type =
+                split.length > 2 ? parseMetricTypes(split[2]) : ALL_METRIC_TYPES;
+
+        return new FilterSpec(scope, name, type);
+    }
+
+    @VisibleForTesting
+    static Pattern convertToPattern(String scopeOrNameComponent) {
+        final String[] split = scopeOrNameComponent.split(";");
+
+        final String rawPattern =
+                Arrays.stream(split)
+                        .map(s -> s.replaceAll("\\.", "\\."))
+                        .map(s -> s.replaceAll("\\*", ".+"))
+                        .map(s -> s)
+                        .collect(Collectors.joining("|", "(", ")"));
+
+        return Pattern.compile(rawPattern);
+    }
+
+    @VisibleForTesting
+    static EnumSet<MetricType> parseMetricTypes(String typeComponent) {

Review comment:
       If we have a metric type that is illegal (e.g., `garbage`) we throw an `IllegalArgumentException` here: `java.lang.IllegalArgumentException: No enum constant org.apache.flink.metrics.MetricType.GARBAGE`.
   
   I guess this is fine, but we could provide a more helpful exception message. Something along: `"garbage" is not a valid metric type`. 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilter.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.filter;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link MetricFilter} implementation that filters metrics based on {@link
+ * MetricOptions#REPORTER_INCLUDES}/{@link MetricOptions#REPORTER_EXCLUDES}.
+ */
+public class DefaultMetricFilter implements MetricFilter {
+
+    private static final EnumSet<MetricType> ALL_METRIC_TYPES = EnumSet.allOf(MetricType.class);
+    private static final EnumSet<MetricType> NO_METRIC_TYPES = EnumSet.noneOf(MetricType.class);
+
+    private final List<FilterSpec> includes;
+    private final List<FilterSpec> excludes;
+
+    private DefaultMetricFilter(List<FilterSpec> includes, List<FilterSpec> excludes) {
+        this.includes = includes;
+        this.excludes = excludes;
+    }
+
+    @Override
+    public boolean filter(Metric metric, String name, String logicalScope) {
+        for (FilterSpec exclude : excludes) {
+            if (exclude.namePattern.matcher(name).matches()
+                    && exclude.scopePattern.matcher(logicalScope).matches()
+                    && exclude.types.contains(metric.getMetricType())) {
+                return false;
+            }
+        }
+        for (FilterSpec include : includes) {
+            if (include.namePattern.matcher(name).matches()
+                    && include.scopePattern.matcher(logicalScope).matches()
+                    && include.types.contains(metric.getMetricType())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static MetricFilter fromConfiguration(Configuration configuration) {
+        final List<String> includes = configuration.get(MetricOptions.REPORTER_INCLUDES);
+        final List<String> excludes = configuration.get(MetricOptions.REPORTER_EXCLUDES);
+
+        final List<FilterSpec> includeFilters =
+                includes.stream().map(i -> parse(i)).collect(Collectors.toList());
+        final List<FilterSpec> excludeFilters =
+                excludes.stream().map(e -> parse(e)).collect(Collectors.toList());
+
+        return new DefaultMetricFilter(includeFilters, excludeFilters);
+    }
+
+    private static FilterSpec parse(String filter) {
+        final String[] split = filter.split(":");
+        final Pattern scope = convertToPattern(split[0]);
+        final Pattern name = split.length > 1 ? convertToPattern(split[1]) : Pattern.compile(".+");
+        final EnumSet<MetricType> type =
+                split.length > 2 ? parseMetricTypes(split[2]) : ALL_METRIC_TYPES;
+
+        return new FilterSpec(scope, name, type);
+    }
+
+    @VisibleForTesting
+    static Pattern convertToPattern(String scopeOrNameComponent) {
+        final String[] split = scopeOrNameComponent.split(";");
+
+        final String rawPattern =
+                Arrays.stream(split)
+                        .map(s -> s.replaceAll("\\.", "\\."))
+                        .map(s -> s.replaceAll("\\*", ".+"))

Review comment:
       This means "numRecordsIn*" does no match "numRecordsIn" but "numRecordsInPerSecond". Is this a common approach for `*`? I know that for globbing `*` means any sequence of characters (incl none). I would replace it by `".*"` (and also above) or consider using `+` instead.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilter.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.filter;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link MetricFilter} implementation that filters metrics based on {@link
+ * MetricOptions#REPORTER_INCLUDES}/{@link MetricOptions#REPORTER_EXCLUDES}.
+ */
+public class DefaultMetricFilter implements MetricFilter {
+
+    private static final EnumSet<MetricType> ALL_METRIC_TYPES = EnumSet.allOf(MetricType.class);
+    private static final EnumSet<MetricType> NO_METRIC_TYPES = EnumSet.noneOf(MetricType.class);
+
+    private final List<FilterSpec> includes;
+    private final List<FilterSpec> excludes;
+
+    private DefaultMetricFilter(List<FilterSpec> includes, List<FilterSpec> excludes) {
+        this.includes = includes;
+        this.excludes = excludes;
+    }
+
+    @Override
+    public boolean filter(Metric metric, String name, String logicalScope) {
+        for (FilterSpec exclude : excludes) {
+            if (exclude.namePattern.matcher(name).matches()
+                    && exclude.scopePattern.matcher(logicalScope).matches()
+                    && exclude.types.contains(metric.getMetricType())) {
+                return false;
+            }
+        }
+        for (FilterSpec include : includes) {
+            if (include.namePattern.matcher(name).matches()
+                    && include.scopePattern.matcher(logicalScope).matches()
+                    && include.types.contains(metric.getMetricType())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static MetricFilter fromConfiguration(Configuration configuration) {
+        final List<String> includes = configuration.get(MetricOptions.REPORTER_INCLUDES);
+        final List<String> excludes = configuration.get(MetricOptions.REPORTER_EXCLUDES);
+
+        final List<FilterSpec> includeFilters =
+                includes.stream().map(i -> parse(i)).collect(Collectors.toList());
+        final List<FilterSpec> excludeFilters =
+                excludes.stream().map(e -> parse(e)).collect(Collectors.toList());
+
+        return new DefaultMetricFilter(includeFilters, excludeFilters);
+    }
+
+    private static FilterSpec parse(String filter) {
+        final String[] split = filter.split(":");
+        final Pattern scope = convertToPattern(split[0]);
+        final Pattern name = split.length > 1 ? convertToPattern(split[1]) : Pattern.compile(".+");
+        final EnumSet<MetricType> type =
+                split.length > 2 ? parseMetricTypes(split[2]) : ALL_METRIC_TYPES;
+
+        return new FilterSpec(scope, name, type);
+    }
+
+    @VisibleForTesting
+    static Pattern convertToPattern(String scopeOrNameComponent) {
+        final String[] split = scopeOrNameComponent.split(";");
+
+        final String rawPattern =
+                Arrays.stream(split)
+                        .map(s -> s.replaceAll("\\.", "\\."))
+                        .map(s -> s.replaceAll("\\*", ".+"))
+                        .map(s -> s)

Review comment:
       ```suggestion
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilterTest.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.filter;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricType;
+import org.apache.flink.metrics.util.TestCounter;
+import org.apache.flink.metrics.util.TestMeter;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@Execution(ExecutionMode.CONCURRENT)
+class DefaultMetricFilterTest {
+
+    private static final Counter COUNTER = new TestCounter();
+    private static final Meter METER = new TestMeter();
+
+    @Test
+    void testConvertToPatternWithoutWildcards() {
+        final Pattern pattern = DefaultMetricFilter.convertToPattern("numRecordsIn");
+        assertThat(pattern.toString()).isEqualTo("(numRecordsIn)");
+        assertThat(pattern.matcher("numRecordsIn").matches()).isEqualTo(true);
+        assertThat(pattern.matcher("numBytesOut").matches()).isEqualTo(false);
+    }
+
+    @Test
+    void testConvertToPatternSingle() {
+        final Pattern pattern = DefaultMetricFilter.convertToPattern("numRecords*");
+        assertThat(pattern.toString()).isEqualTo("(numRecords.+)");
+        assertThat(pattern.matcher("numRecordsIn").matches()).isEqualTo(true);
+        assertThat(pattern.matcher("numBytesOut").matches()).isEqualTo(false);
+    }
+
+    @Test
+    void testConvertToPatternMultiple() {
+        final Pattern pattern = DefaultMetricFilter.convertToPattern("numRecords*;numBytes*");
+        assertThat(pattern.toString()).isEqualTo("(numRecords.+|numBytes.+)");
+        assertThat(pattern.matcher("numRecordsIn").matches()).isEqualTo(true);
+        assertThat(pattern.matcher("numBytesOut").matches()).isEqualTo(true);
+        assertThat(pattern.matcher("numBytes").matches()).isEqualTo(false);
+        assertThat(pattern.matcher("hello").matches()).isEqualTo(false);
+    }
+
+    @Test
+    void testParseMetricTypesSingle() {
+        final EnumSet<MetricType> types = DefaultMetricFilter.parseMetricTypes("meter");
+        assertThat(types).containsExactly(MetricType.METER);
+    }
+
+    @Test
+    void testParseMetricTypesMultiple() {
+        final EnumSet<MetricType> types = DefaultMetricFilter.parseMetricTypes("meter;counter");
+        assertThat(types).containsExactlyInAnyOrder(MetricType.METER, MetricType.COUNTER);
+    }
+
+    @Test
+    void testParseMetricTypesCaseIgnored() {
+        final EnumSet<MetricType> types = DefaultMetricFilter.parseMetricTypes("meter;CoUnTeR");
+        assertThat(types).containsExactlyInAnyOrder(MetricType.METER, MetricType.COUNTER);
+    }
+
+    @Test
+    void testFromConfigurationIncludeByScope() {
+        Configuration configuration = new Configuration();
+        configuration.set(
+                MetricOptions.REPORTER_INCLUDES, Arrays.asList("include1:*:*", "include2.*:*:*"));
+
+        final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "name", "include1")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "name", "include1.bar")).isEqualTo(false);
+        assertThat(metricFilter.filter(COUNTER, "name", "include2")).isEqualTo(false);
+        assertThat(metricFilter.filter(COUNTER, "name", "include2.bar")).isEqualTo(true);
+    }
+
+    @Test
+    void testFromConfigurationIncludeByName() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_INCLUDES, Arrays.asList("*:name:*"));
+
+        final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "name", "bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationIncludeByType() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_INCLUDES, Arrays.asList("*:*:counter"));
+
+        final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(METER, "foo", "bar")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationExcludeByScope() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_EXCLUDES, Arrays.asList("include1", "include2.*"));
+
+        final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "name", "include1")).isEqualTo(false);
+        assertThat(metricFilter.filter(COUNTER, "name", "include1.bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "name", "include2")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "name", "include2.bar")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationExcludeByName() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_EXCLUDES, Arrays.asList("*:faa*", "*:foo"));
+
+        final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "name", "bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(false);
+        assertThat(metricFilter.filter(COUNTER, "foob", "bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "faab", "bar")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationExcludeByType() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_EXCLUDES, Arrays.asList("*:*:meter"));
+
+        final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(METER, "foo", "bar")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationIncludeDefault() {

Review comment:
       Isn't this related to `testFromConfigurationExcludeByType`?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilterTest.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.filter;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricType;
+import org.apache.flink.metrics.util.TestCounter;
+import org.apache.flink.metrics.util.TestMeter;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@Execution(ExecutionMode.CONCURRENT)
+class DefaultMetricFilterTest {
+
+    private static final Counter COUNTER = new TestCounter();
+    private static final Meter METER = new TestMeter();
+
+    @Test
+    void testConvertToPatternWithoutWildcards() {
+        final Pattern pattern = DefaultMetricFilter.convertToPattern("numRecordsIn");
+        assertThat(pattern.toString()).isEqualTo("(numRecordsIn)");
+        assertThat(pattern.matcher("numRecordsIn").matches()).isEqualTo(true);
+        assertThat(pattern.matcher("numBytesOut").matches()).isEqualTo(false);
+    }
+
+    @Test
+    void testConvertToPatternSingle() {
+        final Pattern pattern = DefaultMetricFilter.convertToPattern("numRecords*");
+        assertThat(pattern.toString()).isEqualTo("(numRecords.+)");
+        assertThat(pattern.matcher("numRecordsIn").matches()).isEqualTo(true);
+        assertThat(pattern.matcher("numBytesOut").matches()).isEqualTo(false);
+    }
+
+    @Test
+    void testConvertToPatternMultiple() {
+        final Pattern pattern = DefaultMetricFilter.convertToPattern("numRecords*;numBytes*");
+        assertThat(pattern.toString()).isEqualTo("(numRecords.+|numBytes.+)");
+        assertThat(pattern.matcher("numRecordsIn").matches()).isEqualTo(true);
+        assertThat(pattern.matcher("numBytesOut").matches()).isEqualTo(true);
+        assertThat(pattern.matcher("numBytes").matches()).isEqualTo(false);
+        assertThat(pattern.matcher("hello").matches()).isEqualTo(false);
+    }
+
+    @Test
+    void testParseMetricTypesSingle() {
+        final EnumSet<MetricType> types = DefaultMetricFilter.parseMetricTypes("meter");
+        assertThat(types).containsExactly(MetricType.METER);
+    }
+
+    @Test
+    void testParseMetricTypesMultiple() {
+        final EnumSet<MetricType> types = DefaultMetricFilter.parseMetricTypes("meter;counter");
+        assertThat(types).containsExactlyInAnyOrder(MetricType.METER, MetricType.COUNTER);
+    }
+
+    @Test
+    void testParseMetricTypesCaseIgnored() {
+        final EnumSet<MetricType> types = DefaultMetricFilter.parseMetricTypes("meter;CoUnTeR");
+        assertThat(types).containsExactlyInAnyOrder(MetricType.METER, MetricType.COUNTER);
+    }
+
+    @Test
+    void testFromConfigurationIncludeByScope() {
+        Configuration configuration = new Configuration();
+        configuration.set(
+                MetricOptions.REPORTER_INCLUDES, Arrays.asList("include1:*:*", "include2.*:*:*"));
+
+        final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "name", "include1")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "name", "include1.bar")).isEqualTo(false);
+        assertThat(metricFilter.filter(COUNTER, "name", "include2")).isEqualTo(false);
+        assertThat(metricFilter.filter(COUNTER, "name", "include2.bar")).isEqualTo(true);
+    }
+
+    @Test
+    void testFromConfigurationIncludeByName() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_INCLUDES, Arrays.asList("*:name:*"));
+
+        final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "name", "bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationIncludeByType() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_INCLUDES, Arrays.asList("*:*:counter"));
+
+        final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(METER, "foo", "bar")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationExcludeByScope() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_EXCLUDES, Arrays.asList("include1", "include2.*"));
+
+        final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "name", "include1")).isEqualTo(false);
+        assertThat(metricFilter.filter(COUNTER, "name", "include1.bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "name", "include2")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "name", "include2.bar")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationExcludeByName() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_EXCLUDES, Arrays.asList("*:faa*", "*:foo"));
+
+        final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "name", "bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(false);
+        assertThat(metricFilter.filter(COUNTER, "foob", "bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "faab", "bar")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationExcludeByType() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_EXCLUDES, Arrays.asList("*:*:meter"));
+
+        final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(METER, "foo", "bar")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationIncludeDefault() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_EXCLUDES, Arrays.asList("*:*:meter"));
+
+        final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "foo", "hello")).isEqualTo(true);
+        assertThat(metricFilter.filter(METER, "foo", "hello")).isEqualTo(false);
+        assertThat(metricFilter.filter(COUNTER, "foo", "hello")).isEqualTo(true);
+    }
+
+    @Test
+    void testFromConfigurationExcludeDefault() {

Review comment:
       Isn't this rather include by default?

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
##########
@@ -117,8 +119,88 @@
                     .withDescription(
                             "The set of variables that should be excluded for the reporter named <name>. Only applicable to tag-based reporters (e.g., PRometheus, InfluxDB).");
 
+    private static final Description.DescriptionBuilder addFilterDescription(
+            Description.DescriptionBuilder description) {
+        return description
+                .text("Filters are specified as a list, with each filter following this format:")
+                .linebreak()
+                .text("%s", code("<scope>[:<name>[;<name#N>][:type[;type#N]]]"))
+                .list(
+                        text(
+                                "scope: Filters based on the logical scope.%s"
+                                        + "Specified as a pattern where %s matches one or more characters and %s separates scope components.%s"
+                                        + "For example:%s"
+                                        + " \"%s\" matches any job-related metrics on the JobManager,%s"
+                                        + " \"%s\" matches all job-related metrics and%s"
+                                        + " \"%s\" matches all metrics below the job-level (i.e., task/operator metrics etc.).",
+                                linebreak(),
+                                code("*"),
+                                code("."),
+                                linebreak(),
+                                linebreak(),
+                                code("jobmanager.job"),
+                                linebreak(),
+                                code("*.job"),
+                                linebreak(),
+                                code("*.job.*")),
+                        text(
+                                "name: Filters based on the metric name.%s"
+                                        + "Specified as a pattern where %s matches any sequence of characters.%s"

Review comment:
       If we stick with `*` maps to `'.+'`, then I would rather use the wording from above.
   
   ```suggestion
                                           + "Specified as a pattern where %s matches one or more characters.%s"
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org