You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/01 14:49:50 UTC
[4/4] flink git commit: [FLINK-7863] Generalize MetricFetcher to work
with a RestfulGateway
[FLINK-7863] Generalize MetricFetcher to work with a RestfulGateway
Add more logging to MetricFetcher
This closes #4852.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f6f3090
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f6f3090
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f6f3090
Branch: refs/heads/master
Commit: 9f6f30905fcae3ad415f4c37203fb2a94c793334
Parents: 1809cad
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 18 13:50:06 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 1 15:48:00 2017 +0100
----------------------------------------------------------------------
.../runtime/akka/AkkaJobManagerGateway.java | 31 ++++++++
.../flink/runtime/dispatcher/Dispatcher.java | 33 ++++++++
.../runtime/jobmaster/JobManagerRunner.java | 4 +
.../flink/runtime/metrics/MetricRegistry.java | 23 ++++++
.../resourcemanager/ResourceManager.java | 21 +++++
.../resourcemanager/ResourceManagerGateway.java | 11 +++
.../handler/legacy/metrics/MetricFetcher.java | 83 ++++++++++++--------
.../flink/runtime/rpc/akka/AkkaRpcService.java | 4 +
.../runtime/webmonitor/RestfulGateway.java | 20 +++++
.../retriever/MetricQueryServiceGateway.java | 2 +
.../retriever/impl/AkkaQueryServiceGateway.java | 5 ++
.../legacy/metrics/MetricFetcherTest.java | 23 +++---
12 files changed, 212 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 0a2d4d6..6896852 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.akka;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -36,14 +37,17 @@ import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import java.util.Collection;
+import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
import scala.Option;
import scala.reflect.ClassTag$;
@@ -252,6 +256,33 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
}
@Override
+ public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
+ final String jobManagerPath = getAddress();
+ final String jobManagerMetricQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
+
+ return CompletableFuture.completedFuture(
+ Collections.singleton(jobManagerMetricQueryServicePath));
+ }
+
+ @Override
+ public CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
+ return requestTaskManagerInstances(timeout)
+ .thenApply(
+ (Collection<Instance> instances) ->
+ instances
+ .stream()
+ .map(
+ (Instance instance) -> {
+ final String taskManagerAddress = instance.getTaskManagerGateway().getAddress();
+ final String taskManagerMetricQuerServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
+ MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + instance.getTaskManagerID().getResourceIdString();
+
+ return Tuple2.of(instance.getId(), taskManagerMetricQuerServicePath);
+ })
+ .collect(Collectors.toList()));
+ }
+
+ @Override
public CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index efaebb1..dda0275 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.JobSubmissionException;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -51,13 +53,17 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import akka.actor.ActorSystem;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -157,6 +163,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
}
try {
+ metricRegistry.shutdown();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ try {
super.postStop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
@@ -171,6 +183,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
public void start() throws Exception {
super.start();
+ // start the MetricQueryService
+ // TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint
+ final ActorSystem actorSystem = ((AkkaRpcService) getRpcService()).getActorSystem();
+ metricRegistry.startQueryService(actorSystem, null);
+
leaderElectionService.start(this);
}
@@ -356,6 +373,22 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
}
@Override
+ public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
+ final String metricQueryServicePath = metricRegistry.getMetricQueryServicePath();
+
+ if (metricQueryServicePath != null) {
+ return CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath));
+ } else {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+ }
+
+ @Override
+ public CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
+ return resourceManagerGateway.requestTaskManagerMetricQueryServicePaths(timeout);
+ }
+
+ @Override
public CompletableFuture<Integer> getBlobServerPort(Time timeout) {
return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 70abf2f..14baa6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -178,6 +178,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
}
}
+ //----------------------------------------------------------------------------------------------
+ // Getter
+ //----------------------------------------------------------------------------------------------
+
public JobMasterGateway getJobManagerGateway() {
return jobManager.getSelfGateway(JobMasterGateway.class);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 2e07370..278292d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -28,6 +28,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.View;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
@@ -43,6 +44,8 @@ import akka.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.List;
import java.util.TimerTask;
@@ -65,8 +68,13 @@ public class MetricRegistry {
private List<MetricReporter> reporters;
private ScheduledExecutorService executor;
+
+ @Nullable
private ActorRef queryService;
+ @Nullable
+ private String metricQueryServicePath;
+
private ViewUpdater viewUpdater;
private final ScopeFormats scopeFormats;
@@ -87,6 +95,9 @@ public class MetricRegistry {
this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));
+ this.queryService = null;
+ this.metricQueryServicePath = null;
+
if (reporterConfigurations.isEmpty()) {
// no reporters defined
// by default, don't report anything
@@ -165,6 +176,7 @@ public class MetricRegistry {
try {
queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID);
+ metricQueryServicePath = AkkaUtils.getAkkaURL(actorSystem, queryService);
} catch (Exception e) {
LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
}
@@ -172,6 +184,16 @@ public class MetricRegistry {
}
/**
+ * Returns the address under which the {@link MetricQueryService} is reachable.
+ *
+ * @return address of the metric query service
+ */
+ @Nullable
+ public String getMetricQueryServicePath() {
+ return metricQueryServicePath;
+ }
+
+ /**
* Returns the global delimiter.
*
* @return global delimiter
@@ -368,6 +390,7 @@ public class MetricRegistry {
// ------------------------------------------------------------------------
@VisibleForTesting
+ @Nullable
public ActorRef getQueryService() {
return queryService;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 38cfd6e..1d4d4f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
@@ -59,6 +61,8 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -493,6 +497,23 @@ public abstract class ResourceManager<WorkerType extends Serializable>
numberFreeSlots));
}
+ @Override
+ public CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
+ final ArrayList<Tuple2<InstanceID, String>> metricQueryServicePaths = new ArrayList<>(taskExecutors.size());
+
+ for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerRegistrationEntry : taskExecutors.entrySet()) {
+ final ResourceID tmResourceId = workerRegistrationEntry.getKey();
+ final WorkerRegistration<WorkerType> workerRegistration = workerRegistrationEntry.getValue();
+ final String taskManagerAddress = workerRegistration.getTaskExecutorGateway().getAddress();
+ final String tmMetricQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
+ MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + tmResourceId.getResourceIdString();
+
+ metricQueryServicePaths.add(Tuple2.of(workerRegistration.getInstanceID(), tmMetricQueryServicePath));
+ }
+
+ return CompletableFuture.completedFuture(metricQueryServicePaths);
+ }
+
// ------------------------------------------------------------------------
// Internal methods
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index e0674b6..9eacb4b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -27,6 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.jobmaster.JobMaster;
@@ -34,6 +36,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
/**
@@ -166,4 +169,12 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
* @return Future containing the resource overview
*/
CompletableFuture<ResourceOverview> requestResourceOverview(@RpcTimeout Time timeout);
+
+ /**
+ * Requests the paths for the TaskManager's {@link MetricQueryService} to query.
+ *
+ * @param timeout for the asynchronous operation
+ * @return Future containing the collection of instance ids and the corresponding metric query service path
+ */
+ CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index fa71c68..1bfb9f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -19,12 +19,12 @@
package org.apache.flink.runtime.rest.handler.legacy.metrics;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
-import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -49,10 +49,10 @@ import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.Metr
* <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since
* the last call has passed.
*/
-public class MetricFetcher {
+public class MetricFetcher<T extends RestfulGateway> {
private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
- private final GatewayRetriever<JobManagerGateway> retriever;
+ private final GatewayRetriever<T> retriever;
private final MetricQueryServiceRetriever queryServiceRetriever;
private final Executor executor;
private final Time timeout;
@@ -63,7 +63,7 @@ public class MetricFetcher {
private long lastUpdateTime;
public MetricFetcher(
- GatewayRetriever<JobManagerGateway> retriever,
+ GatewayRetriever<T> retriever,
MetricQueryServiceRetriever queryServiceRetriever,
Executor executor,
Time timeout) {
@@ -96,15 +96,20 @@ public class MetricFetcher {
}
private void fetchMetrics() {
+ LOG.debug("Start fetching metrics.");
+
try {
- Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow();
- if (optJobManagerGateway.isPresent()) {
- final JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
+ Optional<T> optionalLeaderGateway = retriever.getNow();
+ if (optionalLeaderGateway.isPresent()) {
+ final T leaderGateway = optionalLeaderGateway.get();
/**
* Remove all metrics that belong to a job that is not running and no longer archived.
*/
- CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
+ CompletableFuture<MultipleJobsDetails> jobDetailsFuture = leaderGateway.requestJobDetails(
+ true,
+ true,
+ timeout);
jobDetailsFuture.whenCompleteAsync(
(MultipleJobsDetails jobDetails, Throwable throwable) -> {
@@ -123,35 +128,41 @@ public class MetricFetcher {
},
executor);
- String jobManagerPath = jobManagerGateway.getAddress();
- String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
-
- retrieveAndQueryMetrics(jmQueryServicePath);
+ CompletableFuture<Collection<String>> queryServicePathsFuture = leaderGateway.requestMetricQueryServicePaths(timeout);
- /**
- * We first request the list of all registered task managers from the job manager, and then
- * request the respective metric dump from each task manager.
- *
- * <p>All stored metrics that do not belong to a registered task manager will be removed.
- */
- CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
-
- taskManagersFuture.whenCompleteAsync(
- (Collection<Instance> taskManagers, Throwable throwable) -> {
+ queryServicePathsFuture.whenCompleteAsync(
+ (Collection<String> queryServicePaths, Throwable throwable) -> {
if (throwable != null) {
- LOG.debug("Fetching list of registered TaskManagers failed.", throwable);
+ LOG.warn("Requesting paths for query services failed.", throwable);
} else {
- List<String> activeTaskManagers = taskManagers.stream().map(
- taskManagerInstance -> {
- final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress();
- final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString();
-
- retrieveAndQueryMetrics(tmQueryServicePath);
+ for (String queryServicePath : queryServicePaths) {
+ retrieveAndQueryMetrics(queryServicePath);
+ }
+ }
+ },
+ executor);
- return taskManagerInstance.getId().toString();
- }).collect(Collectors.toList());
+ // TODO: Once the old code has been ditched, remove the explicit TaskManager query service discovery
+ // TODO: and return it as part of requestQueryServicePaths. Moreover, change the MetricStore such that
+ // TODO: we don't have to explicitly retain the valid TaskManagers, e.g. letting it be a cache with expiry time
+ CompletableFuture<Collection<Tuple2<InstanceID, String>>> taskManagerQueryServicePathsFuture = leaderGateway
+ .requestTaskManagerMetricQueryServicePaths(timeout);
- metrics.retainTaskManagers(activeTaskManagers);
+ taskManagerQueryServicePathsFuture.whenCompleteAsync(
+ (Collection<Tuple2<InstanceID, String>> queryServicePaths, Throwable throwable) -> {
+ if (throwable != null) {
+ LOG.warn("Requesting TaskManager's path for query services failed.", throwable);
+ } else {
+ List<String> taskManagersToRetain = queryServicePaths
+ .stream()
+ .map(
+ (Tuple2<InstanceID, String> tuple) -> {
+ retrieveAndQueryMetrics(tuple.f1);
+ return tuple.f0.toString();
+ }
+ ).collect(Collectors.toList());
+
+ metrics.retainTaskManagers(taskManagersToRetain);
}
},
executor);
@@ -167,6 +178,8 @@ public class MetricFetcher {
* @param queryServicePath specifying the QueryServiceGateway
*/
private void retrieveAndQueryMetrics(String queryServicePath) {
+ LOG.debug("Retrieve metric query service gateway for {}", queryServicePath);
+
final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath);
queryServiceGatewayFuture.whenCompleteAsync(
@@ -186,6 +199,8 @@ public class MetricFetcher {
* @param queryServiceGateway to query for metrics
*/
private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
+ LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
+
queryServiceGateway
.queryMetrics(timeout)
.whenCompleteAsync(
http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 4ad7e70..68b5aaa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -133,6 +133,10 @@ public class AkkaRpcService implements RpcService {
stopped = false;
}
+ public ActorSystem getActorSystem() {
+ return actorSystem;
+ }
+
@Override
public String getAddress() {
return address;
http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index b2fc026..d871b06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -20,14 +20,18 @@ package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
/**
@@ -76,4 +80,20 @@ public interface RestfulGateway extends RpcGateway {
* @return Future containing the status overview
*/
CompletableFuture<ClusterOverview> requestClusterOverview(@RpcTimeout Time timeout);
+
+ /**
+ * Requests the paths for the {@link MetricQueryService} to query.
+ *
+ * @param timeout for the asynchronous operation
+ * @return Future containing the collection of metric query service paths to query
+ */
+ CompletableFuture<Collection<String>> requestMetricQueryServicePaths(@RpcTimeout Time timeout);
+
+ /**
+ * Requests the paths for the TaskManager's {@link MetricQueryService} to query.
+ *
+ * @param timeout for the asynchronous operation
+ * @return Future containing the collection of instance ids and the corresponding metric query service path
+ */
+ CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
index c79bf5d..8368993 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
@@ -33,4 +33,6 @@ import java.util.concurrent.CompletableFuture;
public interface MetricQueryServiceGateway {
CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout);
+
+ String getAddress();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
index 8985205..c53dce0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
@@ -50,4 +50,9 @@ public class AkkaQueryServiceGateway implements MetricQueryServiceGateway {
.mapTo(ClassTag$.MODULE$.apply(MetricDumpSerialization.MetricSerializationResult.class))
);
}
+
+ @Override
+ public String getAddress() {
+ return queryServiceActorRef.path().toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index a6eaf2f..ce98f31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -28,9 +28,7 @@ import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -76,25 +74,22 @@ public class MetricFetcherTest extends TestLogger {
JobID jobID = new JobID();
InstanceID tmID = new InstanceID();
ResourceID tmRID = new ResourceID(tmID.toString());
- TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class);
- when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
-
- Instance taskManager = mock(Instance.class);
- when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
- when(taskManager.getId()).thenReturn(tmID);
- when(taskManager.getTaskManagerID()).thenReturn(tmRID);
// ========= setup JobManager ==================================================================================
JobDetails details = mock(JobDetails.class);
when(details.getJobId()).thenReturn(jobID);
+ final String jmMetricQueryServicePath = "/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
+ final String tmMetricQueryServicePath = "/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString();
+
JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
when(jobManagerGateway.requestJobDetails(anyBoolean(), anyBoolean(), any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList(), Collections.emptyList())));
- when(jobManagerGateway.requestTaskManagerInstances(any(Time.class)))
- .thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager)));
- when(jobManagerGateway.getAddress()).thenReturn("/jm/address");
+ when(jobManagerGateway.requestMetricQueryServicePaths(any(Time.class))).thenReturn(
+ CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath)));
+ when(jobManagerGateway.requestTaskManagerMetricQueryServicePaths(any(Time.class))).thenReturn(
+ CompletableFuture.completedFuture(Collections.singleton(Tuple2.of(tmID, tmMetricQueryServicePath))));
GatewayRetriever<JobManagerGateway> retriever = mock(AkkaJobManagerRetriever.class);
when(retriever.getNow())
@@ -112,8 +107,8 @@ public class MetricFetcherTest extends TestLogger {
.thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
MetricQueryServiceRetriever queryServiceRetriever = mock(MetricQueryServiceRetriever.class);
- when(queryServiceRetriever.retrieveService(eq("/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
- when(queryServiceRetriever.retrieveService(eq("/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
+ when(queryServiceRetriever.retrieveService(eq(jmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
+ when(queryServiceRetriever.retrieveService(eq(tmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
// ========= start MetricFetcher testing =======================================================================
MetricFetcher fetcher = new MetricFetcher(