You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/01 14:49:50 UTC

[4/4] flink git commit: [FLINK-7863] Generalize MetricFetcher to work with a RestfulGateway

[FLINK-7863] Generalize MetricFetcher to work with a RestfulGateway

Add more logging to MetricFetcher

This closes #4852.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f6f3090
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f6f3090
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f6f3090

Branch: refs/heads/master
Commit: 9f6f30905fcae3ad415f4c37203fb2a94c793334
Parents: 1809cad
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 18 13:50:06 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 1 15:48:00 2017 +0100

----------------------------------------------------------------------
 .../runtime/akka/AkkaJobManagerGateway.java     | 31 ++++++++
 .../flink/runtime/dispatcher/Dispatcher.java    | 33 ++++++++
 .../runtime/jobmaster/JobManagerRunner.java     |  4 +
 .../flink/runtime/metrics/MetricRegistry.java   | 23 ++++++
 .../resourcemanager/ResourceManager.java        | 21 +++++
 .../resourcemanager/ResourceManagerGateway.java | 11 +++
 .../handler/legacy/metrics/MetricFetcher.java   | 83 ++++++++++++--------
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |  4 +
 .../runtime/webmonitor/RestfulGateway.java      | 20 +++++
 .../retriever/MetricQueryServiceGateway.java    |  2 +
 .../retriever/impl/AkkaQueryServiceGateway.java |  5 ++
 .../legacy/metrics/MetricFetcherTest.java       | 23 +++---
 12 files changed, 212 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 0a2d4d6..6896852 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.akka;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -36,14 +37,17 @@ import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
 
 import scala.Option;
 import scala.reflect.ClassTag$;
@@ -252,6 +256,33 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 	}
 
 	@Override
+	public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
+		final String jobManagerPath = getAddress();
+		final String jobManagerMetricQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
+
+		return CompletableFuture.completedFuture(
+			Collections.singleton(jobManagerMetricQueryServicePath));
+	}
+
+	@Override
+	public CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
+		return requestTaskManagerInstances(timeout)
+			.thenApply(
+				(Collection<Instance> instances) ->
+					instances
+						.stream()
+						.map(
+							(Instance instance) -> {
+								final String taskManagerAddress = instance.getTaskManagerGateway().getAddress();
+								final String taskManagerMetricQuerServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
+									MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + instance.getTaskManagerID().getResourceIdString();
+
+								return Tuple2.of(instance.getId(), taskManagerMetricQuerServicePath);
+							})
+						.collect(Collectors.toList()));
+	}
+
+	@Override
 	public CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout) {
 		return FutureUtils.toJava(
 			jobManagerGateway

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index efaebb1..dda0275 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobSubmissionException;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -51,13 +53,17 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import akka.actor.ActorSystem;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -157,6 +163,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		}
 
 		try {
+			metricRegistry.shutdown();
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
+
+		try {
 			super.postStop();
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
@@ -171,6 +183,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	public void start() throws Exception {
 		super.start();
 
+		// start the MetricQueryService
+		// TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint
+		final ActorSystem actorSystem = ((AkkaRpcService) getRpcService()).getActorSystem();
+		metricRegistry.startQueryService(actorSystem, null);
+
 		leaderElectionService.start(this);
 	}
 
@@ -356,6 +373,22 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	}
 
 	@Override
+	public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
+		final String metricQueryServicePath = metricRegistry.getMetricQueryServicePath();
+
+		if (metricQueryServicePath != null) {
+			return CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath));
+		} else {
+			return CompletableFuture.completedFuture(Collections.emptyList());
+		}
+	}
+
+	@Override
+	public CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
+		return resourceManagerGateway.requestTaskManagerMetricQueryServicePaths(timeout);
+	}
+
+	@Override
 	public CompletableFuture<Integer> getBlobServerPort(Time timeout) {
 		return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 70abf2f..14baa6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -178,6 +178,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 		}
 	}
 
+	//----------------------------------------------------------------------------------------------
+	// Getter
+	//----------------------------------------------------------------------------------------------
+
 	public JobMasterGateway getJobManagerGateway() {
 		return jobManager.getSelfGateway(JobMasterGateway.class);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 2e07370..278292d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -28,6 +28,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.View;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
@@ -43,6 +44,8 @@ import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.TimerTask;
@@ -65,8 +68,13 @@ public class MetricRegistry {
 
 	private List<MetricReporter> reporters;
 	private ScheduledExecutorService executor;
+
+	@Nullable
 	private ActorRef queryService;
 
+	@Nullable
+	private String metricQueryServicePath;
+
 	private ViewUpdater viewUpdater;
 
 	private final ScopeFormats scopeFormats;
@@ -87,6 +95,9 @@ public class MetricRegistry {
 
 		this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));
 
+		this.queryService = null;
+		this.metricQueryServicePath = null;
+
 		if (reporterConfigurations.isEmpty()) {
 			// no reporters defined
 			// by default, don't report anything
@@ -165,6 +176,7 @@ public class MetricRegistry {
 
 			try {
 				queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID);
+				metricQueryServicePath = AkkaUtils.getAkkaURL(actorSystem, queryService);
 			} catch (Exception e) {
 				LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
 			}
@@ -172,6 +184,16 @@ public class MetricRegistry {
 	}
 
 	/**
+	 * Returns the address under which the {@link MetricQueryService} is reachable.
+	 *
+	 * @return address of the metric query service
+	 */
+	@Nullable
+	public String getMetricQueryServicePath() {
+		return metricQueryServicePath;
+	}
+
+	/**
 	 * Returns the global delimiter.
 	 *
 	 * @return global delimiter
@@ -368,6 +390,7 @@ public class MetricRegistry {
 	// ------------------------------------------------------------------------
 
 	@VisibleForTesting
+	@Nullable
 	public ActorRef getQueryService() {
 		return queryService;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 38cfd6e..1d4d4f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
@@ -59,6 +61,8 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -493,6 +497,23 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 				numberFreeSlots));
 	}
 
+	@Override
+	public CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
+		final ArrayList<Tuple2<InstanceID, String>> metricQueryServicePaths = new ArrayList<>(taskExecutors.size());
+
+		for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerRegistrationEntry : taskExecutors.entrySet()) {
+			final ResourceID tmResourceId = workerRegistrationEntry.getKey();
+			final WorkerRegistration<WorkerType> workerRegistration = workerRegistrationEntry.getValue();
+			final String taskManagerAddress = workerRegistration.getTaskExecutorGateway().getAddress();
+			final String tmMetricQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
+				MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + tmResourceId.getResourceIdString();
+
+			metricQueryServicePaths.add(Tuple2.of(workerRegistration.getInstanceID(), tmMetricQueryServicePath));
+		}
+
+		return CompletableFuture.completedFuture(metricQueryServicePaths);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Internal methods
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index e0674b6..9eacb4b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -27,6 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
@@ -34,6 +36,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -166,4 +169,12 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
 	 * @return Future containing the resource overview
 	 */
 	CompletableFuture<ResourceOverview> requestResourceOverview(@RpcTimeout Time timeout);
+
+	/**
+	 * Requests the paths for the TaskManager's {@link MetricQueryService} to query.
+	 *
+	 * @param timeout for the asynchronous operation
+	 * @return Future containing the collection of instance ids and the corresponding metric query service path
+	 */
+	CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index fa71c68..1bfb9f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
-import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -49,10 +49,10 @@ import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.Metr
  * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since
  * the last call has passed.
  */
-public class MetricFetcher {
+public class MetricFetcher<T extends RestfulGateway> {
 	private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
 
-	private final GatewayRetriever<JobManagerGateway> retriever;
+	private final GatewayRetriever<T> retriever;
 	private final MetricQueryServiceRetriever queryServiceRetriever;
 	private final Executor executor;
 	private final Time timeout;
@@ -63,7 +63,7 @@ public class MetricFetcher {
 	private long lastUpdateTime;
 
 	public MetricFetcher(
-			GatewayRetriever<JobManagerGateway> retriever,
+			GatewayRetriever<T> retriever,
 			MetricQueryServiceRetriever queryServiceRetriever,
 			Executor executor,
 			Time timeout) {
@@ -96,15 +96,20 @@ public class MetricFetcher {
 	}
 
 	private void fetchMetrics() {
+		LOG.debug("Start fetching metrics.");
+
 		try {
-			Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow();
-			if (optJobManagerGateway.isPresent()) {
-				final JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
+			Optional<T> optionalLeaderGateway = retriever.getNow();
+			if (optionalLeaderGateway.isPresent()) {
+				final T leaderGateway = optionalLeaderGateway.get();
 
 				/**
 				 * Remove all metrics that belong to a job that is not running and no longer archived.
 				 */
-				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
+				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = leaderGateway.requestJobDetails(
+					true,
+					true,
+					timeout);
 
 				jobDetailsFuture.whenCompleteAsync(
 					(MultipleJobsDetails jobDetails, Throwable throwable) -> {
@@ -123,35 +128,41 @@ public class MetricFetcher {
 					},
 					executor);
 
-				String jobManagerPath = jobManagerGateway.getAddress();
-				String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
-
-				retrieveAndQueryMetrics(jmQueryServicePath);
+				CompletableFuture<Collection<String>> queryServicePathsFuture = leaderGateway.requestMetricQueryServicePaths(timeout);
 
-				/**
-				 * We first request the list of all registered task managers from the job manager, and then
-				 * request the respective metric dump from each task manager.
-				 *
-				 * <p>All stored metrics that do not belong to a registered task manager will be removed.
-				 */
-				CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
-
-				taskManagersFuture.whenCompleteAsync(
-					(Collection<Instance> taskManagers, Throwable throwable) -> {
+				queryServicePathsFuture.whenCompleteAsync(
+					(Collection<String> queryServicePaths, Throwable throwable) -> {
 						if (throwable != null) {
-							LOG.debug("Fetching list of registered TaskManagers failed.", throwable);
+							LOG.warn("Requesting paths for query services failed.", throwable);
 						} else {
-							List<String> activeTaskManagers = taskManagers.stream().map(
-								taskManagerInstance -> {
-									final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress();
-									final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString();
-
-									retrieveAndQueryMetrics(tmQueryServicePath);
+							for (String queryServicePath : queryServicePaths) {
+								retrieveAndQueryMetrics(queryServicePath);
+							}
+						}
+					},
+					executor);
 
-									return taskManagerInstance.getId().toString();
-								}).collect(Collectors.toList());
+				// TODO: Once the old code has been ditched, remove the explicit TaskManager query service discovery
+				// TODO: and return it as part of requestQueryServicePaths. Moreover, change the MetricStore such that
+				// TODO: we don't have to explicitly retain the valid TaskManagers, e.g. letting it be a cache with expiry time
+				CompletableFuture<Collection<Tuple2<InstanceID, String>>> taskManagerQueryServicePathsFuture = leaderGateway
+					.requestTaskManagerMetricQueryServicePaths(timeout);
 
-							metrics.retainTaskManagers(activeTaskManagers);
+				taskManagerQueryServicePathsFuture.whenCompleteAsync(
+					(Collection<Tuple2<InstanceID, String>> queryServicePaths, Throwable throwable) -> {
+						if (throwable != null) {
+							LOG.warn("Requesting TaskManager's path for query services failed.", throwable);
+						} else {
+							List<String> taskManagersToRetain = queryServicePaths
+								.stream()
+								.map(
+									(Tuple2<InstanceID, String> tuple) -> {
+										retrieveAndQueryMetrics(tuple.f1);
+										return tuple.f0.toString();
+									}
+								).collect(Collectors.toList());
+
+							metrics.retainTaskManagers(taskManagersToRetain);
 						}
 					},
 					executor);
@@ -167,6 +178,8 @@ public class MetricFetcher {
 	 * @param queryServicePath specifying the QueryServiceGateway
 	 */
 	private void retrieveAndQueryMetrics(String queryServicePath) {
+		LOG.debug("Retrieve metric query service gateway for {}", queryServicePath);
+
 		final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath);
 
 		queryServiceGatewayFuture.whenCompleteAsync(
@@ -186,6 +199,8 @@ public class MetricFetcher {
 	 * @param queryServiceGateway to query for metrics
 	 */
 	private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
+		LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
+
 		queryServiceGateway
 			.queryMetrics(timeout)
 			.whenCompleteAsync(

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 4ad7e70..68b5aaa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -133,6 +133,10 @@ public class AkkaRpcService implements RpcService {
 		stopped = false;
 	}
 
+	public ActorSystem getActorSystem() {
+		return actorSystem;
+	}
+
 	@Override
 	public String getAddress() {
 		return address;

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index b2fc026..d871b06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -20,14 +20,18 @@ package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -76,4 +80,20 @@ public interface RestfulGateway extends RpcGateway {
 	 * @return Future containing the status overview
 	 */
 	CompletableFuture<ClusterOverview> requestClusterOverview(@RpcTimeout Time timeout);
+
+	/**
+	 * Requests the paths for the {@link MetricQueryService} to query.
+	 *
+	 * @param timeout for the asynchronous operation
+	 * @return Future containing the collection of metric query service paths to query
+	 */
+	CompletableFuture<Collection<String>> requestMetricQueryServicePaths(@RpcTimeout Time timeout);
+
+	/**
+	 * Requests the paths for the TaskManager's {@link MetricQueryService} to query.
+	 *
+	 * @param timeout for the asynchronous operation
+	 * @return Future containing the collection of instance ids and the corresponding metric query service path
+	 */
+	CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
index c79bf5d..8368993 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
@@ -33,4 +33,6 @@ import java.util.concurrent.CompletableFuture;
 public interface MetricQueryServiceGateway {
 
 	CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout);
+
+	String getAddress();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
index 8985205..c53dce0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
@@ -50,4 +50,9 @@ public class AkkaQueryServiceGateway implements MetricQueryServiceGateway {
 				.mapTo(ClassTag$.MODULE$.apply(MetricDumpSerialization.MetricSerializationResult.class))
 		);
 	}
+
+	@Override
+	public String getAddress() {
+		return queryServiceActorRef.path().toString();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index a6eaf2f..ce98f31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -28,9 +28,7 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -76,25 +74,22 @@ public class MetricFetcherTest extends TestLogger {
 		JobID jobID = new JobID();
 		InstanceID tmID = new InstanceID();
 		ResourceID tmRID = new ResourceID(tmID.toString());
-		TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class);
-		when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
-
-		Instance taskManager = mock(Instance.class);
-		when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
-		when(taskManager.getId()).thenReturn(tmID);
-		when(taskManager.getTaskManagerID()).thenReturn(tmRID);
 
 		// ========= setup JobManager ==================================================================================
 		JobDetails details = mock(JobDetails.class);
 		when(details.getJobId()).thenReturn(jobID);
 
+		final String jmMetricQueryServicePath = "/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
+		final String tmMetricQueryServicePath = "/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString();
+
 		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
 
 		when(jobManagerGateway.requestJobDetails(anyBoolean(), anyBoolean(), any(Time.class)))
 			.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList(), Collections.emptyList())));
-		when(jobManagerGateway.requestTaskManagerInstances(any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager)));
-		when(jobManagerGateway.getAddress()).thenReturn("/jm/address");
+		when(jobManagerGateway.requestMetricQueryServicePaths(any(Time.class))).thenReturn(
+			CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath)));
+		when(jobManagerGateway.requestTaskManagerMetricQueryServicePaths(any(Time.class))).thenReturn(
+			CompletableFuture.completedFuture(Collections.singleton(Tuple2.of(tmID, tmMetricQueryServicePath))));
 
 		GatewayRetriever<JobManagerGateway> retriever = mock(AkkaJobManagerRetriever.class);
 		when(retriever.getNow())
@@ -112,8 +107,8 @@ public class MetricFetcherTest extends TestLogger {
 			.thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
 
 		MetricQueryServiceRetriever queryServiceRetriever = mock(MetricQueryServiceRetriever.class);
-		when(queryServiceRetriever.retrieveService(eq("/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
-		when(queryServiceRetriever.retrieveService(eq("/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
+		when(queryServiceRetriever.retrieveService(eq(jmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
+		when(queryServiceRetriever.retrieveService(eq(tmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
 
 		// ========= start MetricFetcher testing =======================================================================
 		MetricFetcher fetcher = new MetricFetcher(