You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/06 12:50:27 UTC

[GitHub] zentol closed pull request #6195: [FLINK-9543][METRICS] Expose JobMaster ID to metric system

zentol closed pull request #6195: [FLINK-9543][METRICS] Expose JobMaster ID to metric system
URL: https://github.com/apache/flink/pull/6195
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index a267abb4137..9a2619f1e1a 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -334,9 +334,10 @@ protected void startClusterComponents(
 			LOG.debug("Starting Dispatcher REST endpoint.");
 			webMonitorEndpoint.start();
 
+			final ResourceID resourceManagerID = ResourceID.generate();
 			resourceManager = createResourceManager(
 				configuration,
-				ResourceID.generate(),
+				resourceManagerID,
 				rpcService,
 				highAvailabilityServices,
 				heartbeatServices,
@@ -345,7 +346,7 @@ protected void startClusterComponents(
 				clusterInformation,
 				webMonitorEndpoint.getRestBaseUrl());
 
-			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress());
+			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress(), resourceManagerID.getResourceIdString());
 
 			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
index e09051d7160..db813255791 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
@@ -39,10 +39,12 @@
 	private final Map<JobID, JobManagerJobMetricGroup> jobs = new HashMap<>();
 
 	private final String hostname;
+	private final String jobManagerId;
 
-	public JobManagerMetricGroup(MetricRegistry registry, String hostname) {
+	public JobManagerMetricGroup(MetricRegistry registry, String hostname, String jobManagerId) {
 		super(registry, registry.getScopeFormats().getJobManagerFormat().formatScope(hostname), null);
 		this.hostname = hostname;
+		this.jobManagerId = jobManagerId;
 	}
 
 	public String hostname() {
@@ -102,6 +104,7 @@ public int numRegisteredJobMetricGroups() {
 	@Override
 	protected void putVariables(Map<String, String> variables) {
 		variables.put(ScopeFormat.SCOPE_HOST, hostname);
+		variables.put(ScopeFormat.SCOPE_JOBMANAGER_ID, jobManagerId);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
index 3869aa642f9..de2f7303e30 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -62,9 +63,10 @@ public static OperatorMetricGroup createUnregisteredOperatorMetricGroup() {
 	 */
 	public static class UnregisteredJobManagerMetricGroup extends JobManagerMetricGroup {
 		private static final String DEFAULT_HOST_NAME = "UnregisteredHost";
+		private static final String DEFAULT_JOBMANAGER_ID = ResourceID.generate().getResourceIdString();
 
 		private UnregisteredJobManagerMetricGroup() {
-			super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME);
+			super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME, DEFAULT_JOBMANAGER_ID);
 		}
 
 		@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
index eb5ea7d4d29..898578076ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
@@ -70,6 +70,10 @@ public String filterCharacters(String input) {
 
 	public static final String SCOPE_HOST = asVariable("host");
 
+	// ----- Job Manager ----
+
+	public static final String SCOPE_JOBMANAGER_ID = asVariable("jm_id");
+
 	// ----- Task Manager ----
 
 	public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 3fd268a1aeb..b350bcf400f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -57,10 +57,12 @@ private MetricUtils() {
 
 	public static JobManagerMetricGroup instantiateJobManagerMetricGroup(
 			final MetricRegistry metricRegistry,
-			final String hostname) {
+			final String hostname,
+			final String jobManagerId) {
 		final JobManagerMetricGroup jobManagerMetricGroup = new JobManagerMetricGroup(
 			metricRegistry,
-			hostname);
+			hostname,
+			jobManagerId);
 
 		MetricGroup statusGroup = jobManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index aca4fdbea1f..409f78a59f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -296,7 +296,9 @@ public void start() throws Exception {
 
 				// bring up the ResourceManager(s)
 				LOG.info("Starting ResourceManger");
+				final ResourceID resourceManagerID = ResourceID.generate();
 				resourceManagerRunner = startResourceManager(
+					resourceManagerID,
 					configuration,
 					haServices,
 					heartbeatServices,
@@ -356,7 +358,7 @@ public void start() throws Exception {
 				// bring up the dispatcher that launches JobManagers when jobs submitted
 				LOG.info("Starting job dispatcher(s) for JobManger");
 
-				this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost");
+				this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost", resourceManagerID.getResourceIdString());
 
 				final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);
 
@@ -762,6 +764,7 @@ protected RpcService createRpcService(
 	}
 
 	protected ResourceManagerRunner startResourceManager(
+		    ResourceID resourceId,
 			Configuration configuration,
 			HighAvailabilityServices haServices,
 			HeartbeatServices heartbeatServices,
@@ -770,7 +773,7 @@ protected ResourceManagerRunner startResourceManager(
 			ClusterInformation clusterInformation) throws Exception {
 
 		final ResourceManagerRunner resourceManagerRunner = new ResourceManagerRunner(
-			ResourceID.generate(),
+			resourceId,
 			FlinkResourceManager.RESOURCE_MANAGER_NAME + '_' + UUID.randomUUID(),
 			configuration,
 			resourceManagerRpcService,
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cebff5881ac..ff498b7ab44 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2517,7 +2517,8 @@ object JobManager {
 
     val jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
       metricRegistry,
-      configuration.getString(JobManagerOptions.ADDRESS))
+      configuration.getString(JobManagerOptions.ADDRESS),
+      ResourceID.generate.getResourceIdString)
 
     (instanceManager,
       scheduler,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index cb5ec67c97c..8cdb6125c1e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
@@ -46,7 +47,8 @@
 	@Test
 	public void addAndRemoveJobs() throws Exception {
 		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-		final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
+		String jobManagerId = ResourceID.generate().getResourceIdString();
+		final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost", jobManagerId);
 
 		final JobID jid1 = new JobID();
 		final JobID jid2 = new JobID();
@@ -78,7 +80,8 @@ public void addAndRemoveJobs() throws Exception {
 	@Test
 	public void testCloseClosesAll() throws Exception {
 		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-		final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
+		String jobManagerId = ResourceID.generate().getResourceIdString();
+		final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost", jobManagerId);
 
 		final JobID jid1 = new JobID();
 		final JobID jid2 = new JobID();
@@ -104,7 +107,8 @@ public void testCloseClosesAll() throws Exception {
 	@Test
 	public void testGenerateScopeDefault() throws Exception {
 		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-		JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
+		String jobManagerId = ResourceID.generate().getResourceIdString();
+		JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost", jobManagerId);
 
 		assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents());
 		assertEquals("localhost.jobmanager.name", group.getMetricIdentifier("name"));
@@ -117,8 +121,8 @@ public void testGenerateScopeCustom() throws Exception {
 		Configuration cfg = new Configuration();
 		cfg.setString(MetricOptions.SCOPE_NAMING_JM, "constant.<host>.foo.<host>");
 		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
-
-		JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "host");
+		String jobManagerId = ResourceID.generate().getResourceIdString();
+		JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "host", jobManagerId);
 
 		assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents());
 		assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name"));
@@ -129,7 +133,8 @@ public void testGenerateScopeCustom() throws Exception {
 	@Test
 	public void testCreateQueryServiceMetricInfo() {
 		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-		JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host");
+		String jobManagerId = ResourceID.generate().getResourceIdString();
+		JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host", jobManagerId);
 
 		QueryScopeInfo.JobManagerQueryScopeInfo info = jm.createQueryServiceMetricInfo(new DummyCharacterFilter());
 		assertEquals("", info.scope);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index 6f4751b07d9..b33aabb8080 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
@@ -40,8 +41,9 @@
 	@Test
 	public void testGenerateScopeDefault() throws Exception {
 		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		String jobManagerId = ResourceID.generate().getResourceIdString();
 
-		JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName");
+		JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName", jobManagerId);
 		JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
 
 		assertArrayEquals(
@@ -61,10 +63,11 @@ public void testGenerateScopeCustom() throws Exception {
 		cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc");
 		cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "some-constant.<job_name>");
 		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
+		String jobManagerId = ResourceID.generate().getResourceIdString();
 
 		JobID jid = new JobID();
 
-		JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName");
+		JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName", jobManagerId);
 		JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
 
 		assertArrayEquals(
@@ -84,10 +87,11 @@ public void testGenerateScopeCustomWildcard() throws Exception {
 		cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter");
 		cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "*.some-constant.<job_id>");
 		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
+		String jobManagerId = ResourceID.generate().getResourceIdString();
 
 		JobID jid = new JobID();
 
-		JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName");
+		JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName", jobManagerId);
 		JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
 
 		assertArrayEquals(
@@ -105,7 +109,9 @@ public void testGenerateScopeCustomWildcard() throws Exception {
 	public void testCreateQueryServiceMetricInfo() {
 		JobID jid = new JobID();
 		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-		JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host");
+		String jobManagerId = ResourceID.generate().getResourceIdString();
+
+		JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host", jobManagerId);
 		JobManagerJobMetricGroup jmj = new JobManagerJobMetricGroup(registry, jm, jid, "jobname");
 
 		QueryScopeInfo.JobQueryScopeInfo info = jmj.createQueryServiceMetricInfo(new DummyCharacterFilter());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services