You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/15 12:05:09 UTC

[GitHub] zentol closed pull request #7095: [FLINK-10857][metrics] Cache logical scopes separately for each reporter

zentol closed pull request #7095: [FLINK-10857][metrics] Cache logical scopes separately for each reporter
URL: https://github.com/apache/flink/pull/7095
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 909915f216e..4400b14a10d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -90,9 +90,9 @@
 	 * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
 	private final String[] scopeStrings;
 
-	/** The logical metrics scope represented by this group, as a concatenated string, lazily computed.
+	/** The logical metrics scope represented by this group for each reporter, as a concatenated string, lazily computed.
 	 * For example: "taskmanager.job.task" */
-	private String logicalScopeString;
+	private String[] logicalScopeStrings;
 
 	/** The metrics query service scope represented by this group, lazily computed. */
 	protected QueryScopeInfo queryServiceScopeInfo;
@@ -107,6 +107,7 @@ public AbstractMetricGroup(MetricRegistry registry, String[] scope, A parent) {
 		this.scopeComponents = checkNotNull(scope);
 		this.parent = parent;
 		this.scopeStrings = new String[registry.getNumberReporters()];
+		this.logicalScopeStrings = new String[registry.getNumberReporters()];
 	}
 
 	public Map<String, String> getAllVariables() {
@@ -152,14 +153,34 @@ public String getLogicalScope(CharacterFilter filter) {
 	 * @return logical scope
 	 */
 	public String getLogicalScope(CharacterFilter filter, char delimiter) {
-		if (logicalScopeString == null) {
-			if (parent == null) {
-				logicalScopeString = getGroupName(filter);
-			} else {
-				logicalScopeString = parent.getLogicalScope(filter, delimiter) + delimiter + getGroupName(filter);
+		return getLogicalScope(filter, delimiter, -1);
+	}
+
+	/**
+	 * Returns the logical scope of this group, for example
+	 * {@code "taskmanager.job.task"}.
+	 *
+	 * @param filter character filter which is applied to the scope components
+	 * @param delimiter delimiter to use for concatenating scope components
+	 * @param reporterIndex index of the reporter
+	 * @return logical scope
+	 */
+	String getLogicalScope(CharacterFilter filter, char delimiter, int reporterIndex) {
+		if (logicalScopeStrings.length == 0 || (reporterIndex < 0 || reporterIndex >= logicalScopeStrings.length)) {
+			return createLogicalScope(filter, delimiter);
+		} else {
+			if (logicalScopeStrings[reporterIndex] == null) {
+				logicalScopeStrings[reporterIndex] = createLogicalScope(filter, delimiter);
 			}
+			return logicalScopeStrings[reporterIndex];
 		}
-		return logicalScopeString;
+	}
+
+	private String createLogicalScope(CharacterFilter filter, char delimiter) {
+		final String groupName = getGroupName(filter);
+		return parent == null
+			? groupName
+			: parent.getLogicalScope(filter, delimiter) + delimiter + groupName;
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
index 63842fef9d6..64397d33db6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
@@ -52,6 +52,6 @@ public String getLogicalScope(CharacterFilter filter) {
 	}
 
 	public String getLogicalScope(CharacterFilter filter, char delimiter) {
-		return parentMetricGroup.getLogicalScope(filter, delimiter);
+		return parentMetricGroup.getLogicalScope(filter, delimiter, this.reporterIndex);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index f8ed3c6a6d8..f3f8b42b851 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -105,6 +105,30 @@ public void testScopeCachingForMultipleReporters() throws Exception {
 		}
 	}
 
+	@Test
+	public void testLogicalScopeCachingForMultipleReporters() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, LogicalScopeReporter1.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, LogicalScopeReporter2.class.getName());
+
+		MetricRegistryImpl testRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+		try {
+			MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id")
+				.addGroup("B")
+				.addGroup("C");
+			tmGroup.counter("1");
+			assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size());
+			for (MetricReporter reporter : testRegistry.getReporters()) {
+				ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter;
+				if (typedReporter.failureCause != null) {
+					throw typedReporter.failureCause;
+				}
+			}
+		} finally {
+			testRegistry.shutdown().get();
+		}
+	}
+
 	private abstract static class ScopeCheckingTestReporter extends TestReporter {
 		protected Exception failureCause;
 
@@ -175,6 +199,38 @@ public String filterCharacters(String input) {
 		}
 	}
 
+	/**
+	 * Reporter that verifies the logical-scope caching behavior.
+	 */
+	public static final class LogicalScopeReporter1 extends ScopeCheckingTestReporter {
+		@Override
+		public String filterCharacters(String input) {
+			return FILTER_B.filterCharacters(input);
+		}
+
+		@Override
+		public void checkScopes(Metric metric, String metricName, MetricGroup group) {
+			final String logicalScope = ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(this, '-');
+			assertEquals("taskmanager-X-C", logicalScope);
+		}
+	}
+
+	/**
+	 * Reporter that verifies the logical-scope caching behavior.
+	 */
+	public static final class LogicalScopeReporter2 extends ScopeCheckingTestReporter {
+		@Override
+		public String filterCharacters(String input) {
+			return FILTER_C.filterCharacters(input);
+		}
+
+		@Override
+		public void checkScopes(Metric metric, String metricName, MetricGroup group) {
+			final String logicalScope = ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(this, ',');
+			assertEquals("taskmanager,B,X", logicalScope);
+		}
+	}
+
 	@Test
 	public void testScopeGenerationWithoutReporters() throws Exception {
 		Configuration config = new Configuration();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services