You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/15 05:43:33 UTC

[GitHub] zentol closed pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

zentol closed pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 89d0cfd3452..ae51a2027e8 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -50,6 +50,7 @@
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
@@ -165,7 +166,8 @@ public MesosResourceManager(
 			MesosConfiguration mesosConfig,
 			MesosTaskManagerParameters taskManagerParameters,
 			ContainerSpecification taskManagerContainerSpec,
-			@Nullable String webUiUrl) {
+			@Nullable String webUiUrl,
+			JobManagerMetricGroup jobManagerMetricGroup) {
 		super(
 			rpcService,
 			resourceManagerEndpointId,
@@ -176,7 +178,8 @@ public MesosResourceManager(
 			metricRegistry,
 			jobLeaderIdService,
 			clusterInformation,
-			fatalErrorHandler);
+			fatalErrorHandler,
+			jobManagerMetricGroup);
 
 		this.mesosServices = Preconditions.checkNotNull(mesosServices);
 		this.actorSystem = Preconditions.checkNotNull(mesosServices.getLocalActorSystem());
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
index 9582e9f2e23..f53ffb301a8 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
@@ -27,6 +27,7 @@
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -62,7 +63,17 @@ public MesosResourceManagerFactory(@Nonnull MesosServices mesosServices, @Nonnul
 	}
 
 	@Override
-	public ResourceManager<RegisteredMesosWorkerNode> createResourceManager(Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl) throws Exception {
+	public ResourceManager<RegisteredMesosWorkerNode> createResourceManager(
+			Configuration configuration,
+			ResourceID resourceId,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			FatalErrorHandler fatalErrorHandler,
+			ClusterInformation clusterInformation,
+			@Nullable String webInterfaceUrl,
+			JobManagerMetricGroup jobManagerMetricGroup) throws Exception {
 		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
 			rmServicesConfiguration,
@@ -85,6 +96,7 @@ public MesosResourceManagerFactory(@Nonnull MesosServices mesosServices, @Nonnul
 			schedulerConfiguration,
 			taskManagerParameters,
 			taskManagerContainerSpec,
-			webInterfaceUrl);
+			webInterfaceUrl,
+			jobManagerMetricGroup);
 	}
 }
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 94bc860a85c..5c9aa468d86 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -57,6 +57,8 @@
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
@@ -172,7 +174,8 @@ public TestingMesosResourceManager(
 			MesosServices mesosServices,
 			MesosConfiguration mesosConfig,
 			MesosTaskManagerParameters taskManagerParameters,
-			ContainerSpecification taskManagerContainerSpec) {
+			ContainerSpecification taskManagerContainerSpec,
+			JobManagerMetricGroup jobManagerMetricGroup) {
 			super(
 				rpcService,
 				resourceManagerEndpointId,
@@ -189,7 +192,8 @@ public TestingMesosResourceManager(
 				mesosConfig,
 				taskManagerParameters,
 				taskManagerContainerSpec,
-				null);
+				null,
+				jobManagerMetricGroup);
 		}
 
 		@Override
@@ -298,8 +302,8 @@ protected void closeTaskManagerConnection(ResourceID resourceID, Exception cause
 					mesosServices,
 					rmServices.mesosConfig,
 					tmParams,
-					containerSpecification
-				);
+					containerSpecification,
+					UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
 
 			// TaskExecutors
 			task1Executor = mockTaskExecutor(task1);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index a1da2131e48..f15ea5bc314 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -23,6 +23,7 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.client.JobSubmissionException;
@@ -53,6 +54,7 @@
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceOverview;
@@ -230,6 +232,8 @@ public void start() throws Exception {
 
 		submittedJobGraphStore.start(this);
 		leaderElectionService.start(this);
+
+		registerDispatcherMetrics(jobManagerMetricGroup);
 	}
 
 	//------------------------------------------------------
@@ -909,6 +913,11 @@ private void clearDispatcherState() {
 		terminateJobManagerRunners();
 	}
 
+	private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) {
+		jobManagerMetricGroup.gauge(MetricNames.NUM_RUNNING_JOBS,
+			() -> (long) jobManagerRunnerFutures.size());
+	}
+
 	/**
 	 * Callback method when current resourceManager loses leadership.
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 043ccecf422..c09c41bb2ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -149,6 +149,11 @@ public AbstractDispatcherResourceManagerComponentFactory(
 			log.debug("Starting Dispatcher REST endpoint.");
 			webMonitorEndpoint.start();
 
+			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
+				metricRegistry,
+				rpcService.getAddress(),
+				ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
+
 			resourceManager = resourceManagerFactory.createResourceManager(
 				configuration,
 				ResourceID.generate(),
@@ -158,12 +163,8 @@ public AbstractDispatcherResourceManagerComponentFactory(
 				metricRegistry,
 				fatalErrorHandler,
 				new ClusterInformation(rpcService.getAddress(), blobServer.getPort()),
-				webMonitorEndpoint.getRestBaseUrl());
-
-			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
-				metricRegistry,
-				rpcService.getAddress(),
-				ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
+				webMonitorEndpoint.getRestBaseUrl(),
+				jobManagerMetricGroup);
 
 			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index c106c391e46..dfa82f15381 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -52,4 +52,9 @@ private MetricNames() {
 	public static final String IO_CURRENT_INPUT_1_WATERMARK = "currentInput1Watermark";
 	public static final String IO_CURRENT_INPUT_2_WATERMARK = "currentInput2Watermark";
 	public static final String IO_CURRENT_OUTPUT_WATERMARK = "currentOutputWatermark";
+
+	public static final String NUM_RUNNING_JOBS = "numRunningJobs";
+	public static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable";
+	public static final String TASK_SLOTS_TOTAL = "taskSlotsTotal";
+	public static final String NUM_REGISTERED_TASK_MANAGERS = "numRegisteredTaskManagers";
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 58341ad496c..3fc7007f9c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -241,6 +241,10 @@ public void start() throws Exception {
 
 				LOG.info("Starting Metrics Registry");
 				metricRegistry = createMetricRegistry(configuration);
+				this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
+					metricRegistry,
+					"localhost",
+					ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
 
 				final RpcService jobManagerRpcService;
 				final RpcService resourceManagerRpcService;
@@ -309,7 +313,8 @@ public void start() throws Exception {
 					heartbeatServices,
 					metricRegistry,
 					resourceManagerRpcService,
-					new ClusterInformation("localhost", blobServer.getPort()));
+					new ClusterInformation("localhost", blobServer.getPort()),
+					jobManagerMetricGroup);
 
 				blobCacheService = new BlobCacheService(
 					configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
@@ -365,11 +370,6 @@ public void start() throws Exception {
 				// bring up the dispatcher that launches JobManagers when jobs submitted
 				LOG.info("Starting job dispatcher(s) for JobManger");
 
-				this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
-					metricRegistry,
-					"localhost",
-					ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
-
 				final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);
 
 				dispatcher = new StandaloneDispatcher(
@@ -765,7 +765,8 @@ protected ResourceManagerRunner startResourceManager(
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
 			RpcService resourceManagerRpcService,
-			ClusterInformation clusterInformation) throws Exception {
+			ClusterInformation clusterInformation,
+			JobManagerMetricGroup jobManagerMetricGroup) throws Exception {
 
 		final ResourceManagerRunner resourceManagerRunner = new ResourceManagerRunner(
 			ResourceID.generate(),
@@ -775,7 +776,8 @@ protected ResourceManagerRunner startResourceManager(
 			haServices,
 			heartbeatServices,
 			metricRegistry,
-			clusterInformation);
+			clusterInformation,
+			jobManagerMetricGroup);
 
 			resourceManagerRunner.start();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index b66921fce20..cd4281e11cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -46,7 +46,9 @@
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
@@ -135,6 +137,8 @@
 
 	private final ClusterInformation clusterInformation;
 
+	private final JobManagerMetricGroup jobManagerMetricGroup;
+
 	/** The service to elect a ResourceManager leader. */
 	private LeaderElectionService leaderElectionService;
 
@@ -159,7 +163,8 @@ public ResourceManager(
 			MetricRegistry metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
 			ClusterInformation clusterInformation,
-			FatalErrorHandler fatalErrorHandler) {
+			FatalErrorHandler fatalErrorHandler,
+			JobManagerMetricGroup jobManagerMetricGroup) {
 
 		super(rpcService, resourceManagerEndpointId);
 
@@ -170,6 +175,7 @@ public ResourceManager(
 		this.jobLeaderIdService = checkNotNull(jobLeaderIdService);
 		this.clusterInformation = checkNotNull(clusterInformation);
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
+		this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup);
 
 		this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
 			resourceId,
@@ -215,6 +221,8 @@ public void start() throws Exception {
 		} catch (Exception e) {
 			throw new ResourceManagerException("Could not start the job leader id service.", e);
 		}
+
+		registerSlotAndTaskExecutorMetrics();
 	}
 
 	@Override
@@ -742,6 +750,18 @@ public void requestHeartbeat(ResourceID resourceID, Void payload) {
 		}
 	}
 
+	private void registerSlotAndTaskExecutorMetrics() {
+		jobManagerMetricGroup.gauge(
+			MetricNames.TASK_SLOTS_AVAILABLE,
+			() -> (long) slotManager.getNumberFreeSlots());
+		jobManagerMetricGroup.gauge(
+			MetricNames.TASK_SLOTS_TOTAL,
+			() -> (long) slotManager.getNumberRegisteredSlots());
+		jobManagerMetricGroup.gauge(
+			MetricNames.NUM_REGISTERED_TASK_MANAGERS,
+			() -> (long) taskExecutors.size());
+	}
+
 	private void clearStateInternal() {
 		jobManagerRegistrations.clear();
 		jmResourceIdRegistrations.clear();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
index 91a7b267594..7f70058e355 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
@@ -25,6 +25,7 @@
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
@@ -46,5 +47,6 @@
 		MetricRegistry metricRegistry,
 		FatalErrorHandler fatalErrorHandler,
 		ClusterInformation clusterInformation,
-		@Nullable String webInterfaceUrl) throws Exception;
+		@Nullable String webInterfaceUrl,
+		JobManagerMetricGroup jobManagerMetricGroup) throws Exception;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 072c2f2b5e6..495410237e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -24,6 +24,7 @@
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.AutoCloseableAsync;
@@ -56,7 +57,8 @@ public ResourceManagerRunner(
 			final HighAvailabilityServices highAvailabilityServices,
 			final HeartbeatServices heartbeatServices,
 			final MetricRegistry metricRegistry,
-			final ClusterInformation clusterInformation) throws Exception {
+			final ClusterInformation clusterInformation,
+			final JobManagerMetricGroup jobManagerMetricGroup) throws Exception {
 
 		Preconditions.checkNotNull(resourceId);
 		Preconditions.checkNotNull(configuration);
@@ -82,7 +84,8 @@ public ResourceManagerRunner(
 			metricRegistry,
 			resourceManagerRuntimeServices.getJobLeaderIdService(),
 			clusterInformation,
-			this);
+			this,
+			jobManagerMetricGroup);
 	}
 
 	public ResourceManagerGateway getResourceManageGateway() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 064c2d361d2..9b409b3df9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -25,6 +25,7 @@
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -53,7 +54,8 @@ public StandaloneResourceManager(
 			MetricRegistry metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
 			ClusterInformation clusterInformation,
-			FatalErrorHandler fatalErrorHandler) {
+			FatalErrorHandler fatalErrorHandler,
+			JobManagerMetricGroup jobManagerMetricGroup) {
 		super(
 			rpcService,
 			resourceManagerEndpointId,
@@ -64,7 +66,8 @@ public StandaloneResourceManager(
 			metricRegistry,
 			jobLeaderIdService,
 			clusterInformation,
-			fatalErrorHandler);
+			fatalErrorHandler,
+			jobManagerMetricGroup);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
index c8e314f659c..bfe74041fc6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
@@ -25,6 +25,7 @@
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
@@ -46,7 +47,8 @@
 			MetricRegistry metricRegistry,
 			FatalErrorHandler fatalErrorHandler,
 			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
+			@Nullable String webInterfaceUrl,
+			JobManagerMetricGroup jobManagerMetricGroup) throws Exception {
 		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
 			resourceManagerRuntimeServicesConfiguration,
@@ -63,6 +65,7 @@
 			metricRegistry,
 			resourceManagerRuntimeServices.getJobLeaderIdService(),
 			clusterInformation,
-			fatalErrorHandler);
+			fatalErrorHandler,
+			jobManagerMetricGroup);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 3c1f3dab246..abc31b3d9d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -46,6 +46,7 @@
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -524,7 +525,8 @@ public void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
 				metricRegistry,
 				jobLeaderIdService,
 				new ClusterInformation("localhost", 1234),
-				testingFatalErrorHandler);
+				testingFatalErrorHandler,
+				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
 
 			resourceManager.start();
 
@@ -622,7 +624,8 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception {
 				metricRegistry,
 				jobLeaderIdService,
 				new ClusterInformation("localhost", 1234),
-				testingFatalErrorHandler);
+				testingFatalErrorHandler,
+				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
 
 			resourceManager.start();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index b8dee83ed46..f2810a83379 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -26,6 +26,7 @@
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
@@ -93,7 +94,8 @@ public void confirmLeaderSessionID(UUID leaderId) {
 				metricRegistry,
 				resourceManagerRuntimeServices.getJobLeaderIdService(),
 				new ClusterInformation("localhost", 1234),
-				testingFatalErrorHandler) {
+				testingFatalErrorHandler,
+				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup()) {
 
 				@Override
 				public void revokeLeadership() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 3e630e1bef6..b01ff18ed34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -34,6 +34,7 @@
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -291,7 +292,8 @@ private ResourceManager createAndStartResourceManager(
 			metricRegistry,
 			jobLeaderIdService,
 			new ClusterInformation("localhost", 1234),
-			fatalErrorHandler);
+			fatalErrorHandler,
+			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
 		resourceManager.start();
 		return resourceManager;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 535271ae105..aa70afdf390 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -28,6 +28,7 @@
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
@@ -212,7 +213,8 @@ private StandaloneResourceManager createAndStartResourceManager(LeaderElectionSe
 				metricRegistry,
 				jobLeaderIdService,
 				new ClusterInformation("localhost", 1234),
-				fatalErrorHandler);
+				fatalErrorHandler,
+				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
 
 		resourceManager.start();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index a1f6227be02..08be4270480 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -24,6 +24,7 @@
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
@@ -90,7 +91,8 @@ public void testRequestTaskManagerInfo() throws Exception {
 			slotManager,
 			NoOpMetricRegistry.INSTANCE,
 			jobLeaderIdService,
-			testingFatalErrorHandler);
+			testingFatalErrorHandler,
+			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
 
 		resourceManager.start();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index e8207019a32..3fdc8ce2c20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -25,6 +25,7 @@
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -49,7 +50,8 @@ public TestingResourceManager(
 			SlotManager slotManager,
 			MetricRegistry metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
-			FatalErrorHandler fatalErrorHandler) {
+			FatalErrorHandler fatalErrorHandler,
+			JobManagerMetricGroup jobManagerMetricGroup) {
 		super(
 			rpcService,
 			resourceManagerEndpointId,
@@ -60,7 +62,8 @@ public TestingResourceManager(
 			metricRegistry,
 			jobLeaderIdService,
 			new ClusterInformation("localhost", 1234),
-			fatalErrorHandler);
+			fatalErrorHandler,
+			jobManagerMetricGroup);
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 1081fd159dd..d2b976f9a74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -149,7 +149,8 @@ public void testSlotAllocation() throws Exception {
 			metricRegistry,
 			jobLeaderIdService,
 			new ClusterInformation("localhost", 1234),
-			testingFatalErrorHandler);
+			testingFatalErrorHandler,
+			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
 
 		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
 			.setTaskManagerLocation(taskManagerLocation)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index 09af236f65a..e97966c4d14 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -29,6 +29,7 @@
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
 
@@ -222,6 +223,7 @@ public static Builder newBuilder() {
 		protected Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction;
 		protected Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier;
 		protected Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier;
+		protected Supplier<CompletableFuture<JobsOverview>> requestOverviewForAllJobsSupplier;
 		protected Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier;
 		protected Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServicePathsSupplier;
 		protected BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction;
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
new file mode 100644
index 00000000000..bd70cadda0f
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for proper initialization of the job manager metrics.
+ */
+public class JobManagerMetricsITCase extends TestLogger {
+
+	private static final String JOB_MANAGER_METRICS_PREFIX = "localhost.jobmanager.";
+
+	private static final BlockerSync sync = new BlockerSync();
+
+	private CheckedThread jobExecuteThread;
+
+	@ClassRule
+	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(1)
+			.build());
+
+	@Before
+	public void setUp() throws Exception {
+		jobExecuteThread = new CheckedThread() {
+
+			@Override
+			public void go() throws Exception {
+				StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+				env.addSource(new SourceFunction<String>() {
+
+					@Override
+					public void run(SourceContext<String> ctx) throws Exception {
+						sync.block();
+					}
+
+					@Override
+					public void cancel() {
+						sync.releaseBlocker();
+					}
+
+				}).addSink(new PrintSinkFunction());
+
+				env.execute();
+			}
+
+		};
+
+		jobExecuteThread.start();
+		sync.awaitBlocker();
+	}
+
+	@Test
+	public void testJobManagerMetrics() throws Exception {
+		assertEquals(1, TestReporter.OPENED_REPORTERS.size());
+		TestReporter reporter = TestReporter.OPENED_REPORTERS.iterator().next();
+
+		List<String> expectedPatterns = getExpectedPatterns();
+
+		Collection<String> gaugeNames = reporter.getGauges().values();
+
+		for (String expectedPattern : expectedPatterns) {
+			boolean found = false;
+			for (String gaugeName : gaugeNames) {
+				if (gaugeName.matches(expectedPattern)) {
+					found = true;
+				}
+			}
+			if (!found) {
+				fail(String.format("Failed to find gauge [%s] in registered gauges [%s]",
+					expectedPattern, gaugeNames));
+			}
+		}
+
+		for (Map.Entry<Gauge<?>, String> entry : reporter.getGauges().entrySet()) {
+			if (entry.getValue().contains(MetricNames.TASK_SLOTS_AVAILABLE)) {
+				assertEquals(0L, entry.getKey().getValue());
+			} else if (entry.getValue().contains(MetricNames.TASK_SLOTS_TOTAL)) {
+				assertEquals(1L, entry.getKey().getValue());
+			} else if (entry.getValue().contains(MetricNames.NUM_REGISTERED_TASK_MANAGERS)) {
+				assertEquals(1L, entry.getKey().getValue());
+			} else if (entry.getValue().contains(MetricNames.NUM_RUNNING_JOBS)) {
+				assertEquals(1L, entry.getKey().getValue());
+			}
+		}
+
+		sync.releaseBlocker();
+		jobExecuteThread.sync();
+	}
+
+	private static Configuration getConfiguration() {
+		Configuration configuration = new Configuration();
+		configuration.setString("metrics.reporter.test_reporter.class", TestReporter.class.getName());
+		return configuration;
+	}
+
+	private static List<String> getExpectedPatterns() {
+		String[] expectedGauges = new String[]{
+			MetricNames.TASK_SLOTS_AVAILABLE,
+			MetricNames.TASK_SLOTS_TOTAL,
+			MetricNames.NUM_REGISTERED_TASK_MANAGERS,
+			MetricNames.NUM_RUNNING_JOBS
+		};
+
+		List<String> patterns = new ArrayList<>();
+		for (String expectedGauge : expectedGauges) {
+			patterns.add(JOB_MANAGER_METRICS_PREFIX + expectedGauge);
+		}
+
+		return patterns;
+	}
+
+	/**
+	 * Test metric reporter that exposes registered metrics.
+	 */
+	public static final class TestReporter extends AbstractReporter {
+		public static final Set<TestReporter> OPENED_REPORTERS = ConcurrentHashMap.newKeySet();
+
+		@Override
+		public String filterCharacters(String input) {
+			return input;
+		}
+
+		@Override
+		public void open(MetricConfig config) {
+			OPENED_REPORTERS.add(this);
+		}
+
+		@Override
+		public void close() {
+			OPENED_REPORTERS.remove(this);
+		}
+
+		public Map<Gauge<?>, String> getGauges() {
+			return gauges;
+		}
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 2a43b8b7f1a..6ff5cd66487 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -32,6 +32,7 @@
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
@@ -131,7 +132,8 @@ public YarnResourceManager(
 			JobLeaderIdService jobLeaderIdService,
 			ClusterInformation clusterInformation,
 			FatalErrorHandler fatalErrorHandler,
-			@Nullable String webInterfaceUrl) {
+			@Nullable String webInterfaceUrl,
+			JobManagerMetricGroup jobManagerMetricGroup) {
 		super(
 			rpcService,
 			resourceManagerEndpointId,
@@ -142,7 +144,8 @@ public YarnResourceManager(
 			metricRegistry,
 			jobLeaderIdService,
 			clusterInformation,
-			fatalErrorHandler);
+			fatalErrorHandler,
+			jobManagerMetricGroup);
 		this.flinkConfig  = flinkConfig;
 		this.yarnConfig = new YarnConfiguration();
 		this.env = env;
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
index bfd1b4a07ed..fc7c4e48e6a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
@@ -24,6 +24,7 @@
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -51,7 +52,8 @@
 			MetricRegistry metricRegistry,
 			FatalErrorHandler fatalErrorHandler,
 			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
+			@Nullable String webInterfaceUrl,
+			JobManagerMetricGroup jobManagerMetricGroup) throws Exception {
 		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
 			rmServicesConfiguration,
@@ -71,6 +73,7 @@
 			rmRuntimeServices.getJobLeaderIdService(),
 			clusterInformation,
 			fatalErrorHandler,
-			webInterfaceUrl);
+			webInterfaceUrl,
+			jobManagerMetricGroup);
 	}
 }
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index a7d4f43672d..d41d42d7a05 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -39,6 +39,8 @@
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -177,7 +179,8 @@ public void teardown() throws Exception {
 				FatalErrorHandler fatalErrorHandler,
 				@Nullable String webInterfaceUrl,
 				AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient,
-				NMClient mockNMClient) {
+				NMClient mockNMClient,
+				JobManagerMetricGroup jobManagerMetricGroup) {
 			super(
 				rpcService,
 				resourceManagerEndpointId,
@@ -191,7 +194,8 @@ public void teardown() throws Exception {
 				jobLeaderIdService,
 				clusterInformation,
 				fatalErrorHandler,
-				webInterfaceUrl);
+				webInterfaceUrl,
+				jobManagerMetricGroup);
 			this.mockNMClient = mockNMClient;
 			this.mockResourceManagerClient = mockResourceManagerClient;
 		}
@@ -248,6 +252,8 @@ protected void runAsync(final Runnable runnable) {
 		public NMClient mockNMClient = mock(NMClient.class);
 		public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient =
 				mock(AMRMClientAsync.class);
+		public JobManagerMetricGroup mockJMMetricGroup =
+				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup();
 
 		/**
 		 * Create mock RM dependencies.
@@ -274,7 +280,8 @@ protected void runAsync(final Runnable runnable) {
 							testingFatalErrorHandler,
 							null,
 							mockResourceManagerClient,
-							mockNMClient);
+							mockNMClient,
+							mockJMMetricGroup);
 		}
 
 		/**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services