You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/03 09:25:24 UTC
[flink] branch master updated: [FLINK-12359][metrics][tests] Harden
SystemResourcesMetricsITCase
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e72c9d8 [FLINK-12359][metrics][tests] Harden SystemResourcesMetricsITCase
e72c9d8 is described below
commit e72c9d8881faf4681c1c1aa229e79e77a89ec15f
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri May 3 11:25:10 2019 +0200
[FLINK-12359][metrics][tests] Harden SystemResourcesMetricsITCase
---
.../metrics/SystemResourcesMetricsITCase.java | 62 +++++++++++-----------
1 file changed, 31 insertions(+), 31 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
index b875d97..7801a28 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
@@ -19,9 +19,12 @@
package org.apache.flink.runtime.metrics;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Gauge;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.TestLogger;
@@ -30,16 +33,17 @@ import org.junit.ClassRule;
import org.junit.Test;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST;
import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
/**
* Integration tests for proper initialization of the system resource metrics.
@@ -58,6 +62,8 @@ public class SystemResourcesMetricsITCase extends TestLogger {
Configuration configuration = new Configuration();
configuration.setBoolean(SYSTEM_RESOURCE_METRICS, true);
configuration.setString(REPORTERS_LIST, "test_reporter");
+ configuration.setString(MetricOptions.SCOPE_NAMING_JM, "jobmanager");
+ configuration.setString(MetricOptions.SCOPE_NAMING_TM, "taskmanager");
configuration.setString("metrics.reporter.test_reporter.class", TestReporter.class.getName());
return configuration;
}
@@ -67,25 +73,11 @@ public class SystemResourcesMetricsITCase extends TestLogger {
assertEquals(1, TestReporter.OPENED_REPORTERS.size());
TestReporter reporter = TestReporter.OPENED_REPORTERS.iterator().next();
- List<String> expectedPatterns = getExpectedPatterns();
-
- Collection<String> gaugeNames = reporter.getGauges().values();
-
- for (String expectedPattern : expectedPatterns) {
- boolean found = false;
- for (String gaugeName : gaugeNames) {
- if (gaugeName.matches(expectedPattern)) {
- found = true;
- }
- }
- if (!found) {
- fail(String.format("Failed to find gauge [%s] in registered gauges [%s]", expectedPattern, gaugeNames));
- }
- }
+ reporter.patternsExhaustedFuture.get(10, TimeUnit.SECONDS);
}
private static List<String> getExpectedPatterns() {
- String[] expectedGauges = new String[] {
+ String[] expectedGauges = {
"System.CPU.Idle",
"System.CPU.Sys",
"System.CPU.User",
@@ -101,9 +93,9 @@ public class SystemResourcesMetricsITCase extends TestLogger {
"System.Network.*SendRate"
};
- String[] expectedHosts = new String[] {
- "localhost.taskmanager.([a-f0-9\\\\-])*.",
- "localhost.jobmanager."
+ String[] expectedHosts = {
+ "taskmanager.",
+ "jobmanager."
};
List<String> patterns = new ArrayList<>();
@@ -118,13 +110,11 @@ public class SystemResourcesMetricsITCase extends TestLogger {
/**
* Test metric reporter that exposes registered metrics.
*/
- public static final class TestReporter extends AbstractReporter {
+ public static final class TestReporter implements MetricReporter {
public static final Set<TestReporter> OPENED_REPORTERS = ConcurrentHashMap.newKeySet();
-
- @Override
- public String filterCharacters(String input) {
- return input;
- }
+ private final Map<String, CompletableFuture<Void>> patternFutures = getExpectedPatterns().stream()
+ .collect(Collectors.toMap(pattern -> pattern, pattern -> new CompletableFuture<>()));
+ private final CompletableFuture<Void> patternsExhaustedFuture = FutureUtils.waitForAll(patternFutures.values());
@Override
public void open(MetricConfig config) {
@@ -136,8 +126,18 @@ public class SystemResourcesMetricsITCase extends TestLogger {
OPENED_REPORTERS.remove(this);
}
- public Map<Gauge<?>, String> getGauges() {
- return gauges;
+ @Override
+ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
+ final String metricIdentifier = group.getMetricIdentifier(metricName, name -> name);
+ for (final String expectedPattern : patternFutures.keySet()) {
+ if (metricIdentifier.matches(expectedPattern)) {
+ patternFutures.get(expectedPattern).complete(null);
+ }
+ }
+ }
+
+ @Override
+ public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
}
}
}