You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/03 09:25:24 UTC

[flink] branch master updated: [FLINK-12359][metrics][tests] Harden SystemResourcesMetricsITCase

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e72c9d8  [FLINK-12359][metrics][tests] Harden SystemResourcesMetricsITCase
e72c9d8 is described below

commit e72c9d8881faf4681c1c1aa229e79e77a89ec15f
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri May 3 11:25:10 2019 +0200

    [FLINK-12359][metrics][tests] Harden SystemResourcesMetricsITCase
---
 .../metrics/SystemResourcesMetricsITCase.java      | 62 +++++++++++-----------
 1 file changed, 31 insertions(+), 31 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
index b875d97..7801a28 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
@@ -19,9 +19,12 @@
 package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Gauge;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.util.TestLogger;
@@ -30,16 +33,17 @@ import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST;
 import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 
 /**
  * Integration tests for proper initialization of the system resource metrics.
@@ -58,6 +62,8 @@ public class SystemResourcesMetricsITCase extends TestLogger {
 		Configuration configuration = new Configuration();
 		configuration.setBoolean(SYSTEM_RESOURCE_METRICS, true);
 		configuration.setString(REPORTERS_LIST, "test_reporter");
+		configuration.setString(MetricOptions.SCOPE_NAMING_JM, "jobmanager");
+		configuration.setString(MetricOptions.SCOPE_NAMING_TM, "taskmanager");
 		configuration.setString("metrics.reporter.test_reporter.class", TestReporter.class.getName());
 		return configuration;
 	}
@@ -67,25 +73,11 @@ public class SystemResourcesMetricsITCase extends TestLogger {
 		assertEquals(1, TestReporter.OPENED_REPORTERS.size());
 		TestReporter reporter = TestReporter.OPENED_REPORTERS.iterator().next();
 
-		List<String> expectedPatterns = getExpectedPatterns();
-
-		Collection<String> gaugeNames = reporter.getGauges().values();
-
-		for (String expectedPattern : expectedPatterns) {
-			boolean found = false;
-			for (String gaugeName : gaugeNames) {
-				if (gaugeName.matches(expectedPattern)) {
-					found = true;
-				}
-			}
-			if (!found) {
-				fail(String.format("Failed to find gauge [%s] in registered gauges [%s]", expectedPattern, gaugeNames));
-			}
-		}
+		reporter.patternsExhaustedFuture.get(10, TimeUnit.SECONDS);
 	}
 
 	private static List<String> getExpectedPatterns() {
-		String[] expectedGauges = new String[] {
+		String[] expectedGauges = {
 			"System.CPU.Idle",
 			"System.CPU.Sys",
 			"System.CPU.User",
@@ -101,9 +93,9 @@ public class SystemResourcesMetricsITCase extends TestLogger {
 			"System.Network.*SendRate"
 		};
 
-		String[] expectedHosts = new String[] {
-			"localhost.taskmanager.([a-f0-9\\\\-])*.",
-			"localhost.jobmanager."
+		String[] expectedHosts = {
+			"taskmanager.",
+			"jobmanager."
 		};
 
 		List<String> patterns = new ArrayList<>();
@@ -118,13 +110,11 @@ public class SystemResourcesMetricsITCase extends TestLogger {
 	/**
 	 * Test metric reporter that exposes registered metrics.
 	 */
-	public static final class TestReporter extends AbstractReporter {
+	public static final class TestReporter implements MetricReporter {
 		public static final Set<TestReporter> OPENED_REPORTERS = ConcurrentHashMap.newKeySet();
-
-		@Override
-		public String filterCharacters(String input) {
-			return input;
-		}
+		private final Map<String, CompletableFuture<Void>> patternFutures = getExpectedPatterns().stream()
+			.collect(Collectors.toMap(pattern -> pattern, pattern -> new CompletableFuture<>()));
+		private final CompletableFuture<Void> patternsExhaustedFuture = FutureUtils.waitForAll(patternFutures.values());
 
 		@Override
 		public void open(MetricConfig config) {
@@ -136,8 +126,18 @@ public class SystemResourcesMetricsITCase extends TestLogger {
 			OPENED_REPORTERS.remove(this);
 		}
 
-		public Map<Gauge<?>, String> getGauges() {
-			return gauges;
+		@Override
+		public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
+			final String metricIdentifier = group.getMetricIdentifier(metricName, name -> name);
+			for (final String expectedPattern : patternFutures.keySet()) {
+				if (metricIdentifier.matches(expectedPattern)) {
+					patternFutures.get(expectedPattern).complete(null);
+				}
+			}
+		}
+
+		@Override
+		public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
 		}
 	}
 }