You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/16 21:49:59 UTC
[flink] 01/03: [hotfix] Introduce MetricFetcher interface
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 992b65af7420ebacf8aa02b26269190919ce3ccd
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 11 15:30:01 2019 +0100
[hotfix] Introduce MetricFetcher interface
Rename MetricFetcher into MetricFetcherImpl and introduce MetricFetcher interface.
This allows for better testing and hides the type parameter of MetricFetcherImpl.
---
.../flink/docs/rest/RestAPIDocGenerator.java | 1 +
.../runtime/webmonitor/WebRuntimeMonitor.java | 8 +-
.../runtime/dispatcher/DispatcherRestEndpoint.java | 1 +
...tDispatcherResourceManagerComponentFactory.java | 11 +-
.../flink/runtime/minicluster/MiniCluster.java | 17 +-
.../flink/runtime/rest/JobRestEndpointFactory.java | 1 +
.../flink/runtime/rest/RestEndpointFactory.java | 1 +
.../runtime/rest/SessionRestEndpointFactory.java | 1 +
.../rest/handler/job/JobDetailsHandler.java | 8 +-
.../rest/handler/job/JobVertexDetailsHandler.java | 6 +-
.../handler/job/JobVertexTaskManagersHandler.java | 6 +-
.../job/SubtaskCurrentAttemptDetailsHandler.java | 4 +-
.../job/SubtaskExecutionAttemptDetailsHandler.java | 6 +-
.../metrics/AbstractAggregatingMetricsHandler.java | 4 +-
.../job/metrics/AggregatingJobsMetricsHandler.java | 2 +-
.../metrics/AggregatingSubtasksMetricsHandler.java | 2 +-
.../AggregatingTaskManagersMetricsHandler.java | 2 +-
.../rest/handler/legacy/metrics/MetricFetcher.java | 182 +--------------------
.../{MetricFetcher.java => MetricFetcherImpl.java} | 35 +++-
.../handler/legacy/metrics/VoidMetricFetcher.java | 38 +++++
.../rest/handler/util/MutableIOMetrics.java | 1 +
.../runtime/webmonitor/WebMonitorEndpoint.java | 11 +-
.../SubtaskCurrentAttemptDetailsHandlerTest.java | 3 +-
.../SubtaskExecutionAttemptDetailsHandlerTest.java | 3 +-
.../metrics/AggregatingJobsMetricsHandlerTest.java | 2 +-
.../metrics/AggregatingMetricsHandlerTestBase.java | 5 +-
.../AggregatingSubtasksMetricsHandlerTest.java | 2 +-
.../AggregatingTaskManagersMetricsHandlerTest.java | 2 +-
.../handler/legacy/metrics/MetricFetcherTest.java | 2 +-
29 files changed, 138 insertions(+), 229 deletions(-)
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 98eaf39..891ae08 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index d0499ef..826b93a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
@@ -101,7 +102,6 @@ public class WebRuntimeMonitor implements WebMonitor {
private AtomicBoolean cleanedUp = new AtomicBoolean();
-
private MetricFetcher metricFetcher;
public WebRuntimeMonitor(
@@ -193,7 +193,11 @@ public class WebRuntimeMonitor implements WebMonitor {
} else {
sslFactory = null;
}
- metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, scheduledExecutor, timeout);
+ metricFetcher = new MetricFetcherImpl<>(
+ retriever,
+ queryServiceRetriever,
+ scheduledExecutor,
+ timeout);
Router router = new Router();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index c6c060d..b36fd6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.runtime.webmonitor.WebMonitorExtension;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 5059476..8b593fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -61,6 +61,7 @@ import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
/**
* Abstract class which implements the creation of the {@link DispatcherResourceManagerComponent} components.
@@ -128,15 +129,17 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend
10,
Time.milliseconds(50L));
+ final ExecutorService executor = WebMonitorEndpoint.createExecutorService(
+ configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
+ configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
+ "DispatcherRestEndpoint");
+
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
- WebMonitorEndpoint.createExecutorService(
- configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
- configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
- "DispatcherRestEndpoint"),
+ executor,
metricQueryServiceRetriever,
highAvailabilityServices.getWebMonitorLeaderElectionService(),
fatalErrorHandler);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1da7827..92738ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -97,6 +97,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -347,6 +348,11 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
20,
Time.milliseconds(20L));
+ final ExecutorService executor = WebMonitorEndpoint.createExecutorService(
+ configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
+ configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
+ "DispatcherRestEndpoint");
+
this.dispatcherRestEndpoint = new DispatcherRestEndpoint(
RestServerEndpointConfiguration.fromConfiguration(configuration),
dispatcherGatewayRetriever,
@@ -354,24 +360,21 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
RestHandlerConfiguration.fromConfiguration(configuration),
resourceManagerGatewayRetriever,
blobServer.getTransientBlobService(),
- WebMonitorEndpoint.createExecutorService(
- configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
- configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
- "DispatcherRestEndpoint"),
+ executor,
new AkkaQueryServiceRetriever(
metricQueryServiceActorSystem,
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
haServices.getWebMonitorLeaderElectionService(),
new ShutDownFatalErrorHandler());
- dispatcherRestEndpoint.start();
+ this.dispatcherRestEndpoint.start();
- restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl());
+ restAddressURI = new URI(this.dispatcherRestEndpoint.getRestBaseUrl());
// bring up the dispatcher that launches JobManagers when jobs submitted
LOG.info("Starting job dispatcher(s) for JobManger");
- final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);
+ final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, this.dispatcherRestEndpoint);
dispatcher = new StandaloneDispatcher(
jobManagerRpcService,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
index 9bfc9ac..dec7768 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
index 64750e7..ff10be6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
index 4669745..2dcf6de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 2a0e1fc..aefe724 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -59,7 +59,7 @@ import java.util.concurrent.Executor;
*/
public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> implements JsonArchivist {
- private final MetricFetcher<?> metricFetcher;
+ private final MetricFetcher metricFetcher;
public JobDetailsHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -68,7 +68,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI
MessageHeaders<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor,
- MetricFetcher<?> metricFetcher) {
+ MetricFetcher metricFetcher) {
super(
leaderRetriever,
timeout,
@@ -95,7 +95,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI
return Collections.singleton(new ArchivedJson(path, json));
}
- private static JobDetailsInfo createJobDetailsInfo(AccessExecutionGraph executionGraph, @Nullable MetricFetcher<?> metricFetcher) {
+ private static JobDetailsInfo createJobDetailsInfo(AccessExecutionGraph executionGraph, @Nullable MetricFetcher metricFetcher) {
final long now = System.currentTimeMillis();
final long startTime = executionGraph.getStatusTimestamp(JobStatus.CREATED);
final long endTime = executionGraph.getState().isGloballyTerminalState() ?
@@ -147,7 +147,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI
AccessExecutionJobVertex ejv,
long now,
JobID jobId,
- MetricFetcher<?> metricFetcher) {
+ MetricFetcher metricFetcher) {
int[] tasksPerState = new int[ExecutionState.values().length];
long startTime = Long.MAX_VALUE;
long endTime = 0;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
index 71a919a..0cab589 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
@@ -57,7 +57,7 @@ import java.util.concurrent.Executor;
* Request handler for the job vertex details.
*/
public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters> implements JsonArchivist {
- private final MetricFetcher<? extends RestfulGateway> metricFetcher;
+ private final MetricFetcher metricFetcher;
public JobVertexDetailsHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -66,7 +66,7 @@ public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVe
MessageHeaders<EmptyRequestBody, JobVertexDetailsInfo, JobVertexMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor,
- MetricFetcher<? extends RestfulGateway> metricFetcher) {
+ MetricFetcher metricFetcher) {
super(
leaderRetriever,
timeout,
@@ -106,7 +106,7 @@ public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVe
return archive;
}
- private static JobVertexDetailsInfo createJobVertexDetailsInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher<?> metricFetcher) {
+ private static JobVertexDetailsInfo createJobVertexDetailsInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher metricFetcher) {
List<JobVertexDetailsInfo.VertexTaskDetail> subtasks = new ArrayList<>();
final long now = System.currentTimeMillis();
int num = 0;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 8984009..fdd9cff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -62,7 +62,7 @@ import java.util.concurrent.Executor;
* runtime and metrics of all its subtasks aggregated by TaskManager.
*/
public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> implements JsonArchivist {
- private MetricFetcher<?> metricFetcher;
+ private MetricFetcher metricFetcher;
public JobVertexTaskManagersHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -71,7 +71,7 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
MessageHeaders<EmptyRequestBody, JobVertexTaskManagersInfo, JobVertexMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor,
- MetricFetcher<?> metricFetcher) {
+ MetricFetcher metricFetcher) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
}
@@ -105,7 +105,7 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
return archive;
}
- private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher<?> metricFetcher) {
+ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher metricFetcher) {
// Build a map that groups tasks by TaskManager
Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
index f632669..3a5a5d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
@@ -46,7 +46,7 @@ import java.util.concurrent.Executor;
*/
public class SubtaskCurrentAttemptDetailsHandler extends AbstractSubtaskHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> {
- private final MetricFetcher<?> metricFetcher;
+ private final MetricFetcher metricFetcher;
public SubtaskCurrentAttemptDetailsHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -55,7 +55,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends AbstractSubtaskHandler<
MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor,
- MetricFetcher<?> metricFetcher) {
+ MetricFetcher metricFetcher) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
index 1caa917..c538606 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -61,7 +61,7 @@ public class SubtaskExecutionAttemptDetailsHandler
extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters>
implements JsonArchivist {
- private final MetricFetcher<?> metricFetcher;
+ private final MetricFetcher metricFetcher;
/**
* Instantiates a new subtask execution attempt details handler.
@@ -80,7 +80,7 @@ public class SubtaskExecutionAttemptDetailsHandler
MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor,
- MetricFetcher<?> metricFetcher) {
+ MetricFetcher metricFetcher) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
@@ -131,7 +131,7 @@ public class SubtaskExecutionAttemptDetailsHandler
AccessExecution execution,
JobID jobID,
JobVertexID jobVertexID,
- @Nullable MetricFetcher<?> metricFetcher) {
+ @Nullable MetricFetcher metricFetcher) {
final MutableIOMetrics ioMetrics = new MutableIOMetrics();
ioMetrics.addIOMetrics(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
index be48d89..b482dfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
@@ -74,7 +74,7 @@ import java.util.stream.Collectors;
public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggregatedMetricsParameters<?>> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AggregatedMetricsResponseBody, P> {
private final Executor executor;
- private final MetricFetcher<?> fetcher;
+ private final MetricFetcher fetcher;
protected AbstractAggregatingMetricsHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -82,7 +82,7 @@ public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggreg
Map<String, String> responseHeaders,
AbstractAggregatedMetricsHeaders<P> messageHeaders,
Executor executor,
- MetricFetcher<?> fetcher) {
+ MetricFetcher fetcher) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders);
this.executor = Preconditions.checkNotNull(executor);
this.fetcher = Preconditions.checkNotNull(fetcher);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
index df4483c..ead1b3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
@@ -51,7 +51,7 @@ public class AggregatingJobsMetricsHandler extends AbstractAggregatingMetricsHan
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout, Map<String, String> responseHeaders,
Executor executor,
- MetricFetcher<?> fetcher) {
+ MetricFetcher fetcher) {
super(leaderRetriever, timeout, responseHeaders, AggregatedJobMetricsHeaders.getInstance(), executor, fetcher);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
index bd5e83d..a2eed43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
@@ -58,7 +58,7 @@ public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetric
Time timeout,
Map<String, String> responseHeaders,
Executor executor,
- MetricFetcher<?> fetcher) {
+ MetricFetcher fetcher) {
super(leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
index e943e2b..a638132 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
@@ -51,7 +51,7 @@ public class AggregatingTaskManagersMetricsHandler extends AbstractAggregatingMe
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout, Map<String, String> responseHeaders,
Executor executor,
- MetricFetcher<?> fetcher) {
+ MetricFetcher fetcher) {
super(leaderRetriever, timeout, responseHeaders, AggregatedTaskManagerMetricsHeaders.getInstance(), executor, fetcher);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index 44a6169..3afdd25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -18,193 +18,23 @@
package org.apache.flink.runtime.rest.handler.legacy.metrics;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
-
/**
* The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers.
*
* <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since
* the last call has passed.
*/
-public class MetricFetcher<T extends RestfulGateway> {
- private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
-
- private final GatewayRetriever<T> retriever;
- private final MetricQueryServiceRetriever queryServiceRetriever;
- private final Executor executor;
- private final Time timeout;
-
- private final MetricStore metrics = new MetricStore();
- private final MetricDumpDeserializer deserializer = new MetricDumpDeserializer();
-
- private long lastUpdateTime;
-
- public MetricFetcher(
- GatewayRetriever<T> retriever,
- MetricQueryServiceRetriever queryServiceRetriever,
- Executor executor,
- Time timeout) {
- this.retriever = Preconditions.checkNotNull(retriever);
- this.queryServiceRetriever = Preconditions.checkNotNull(queryServiceRetriever);
- this.executor = Preconditions.checkNotNull(executor);
- this.timeout = Preconditions.checkNotNull(timeout);
- }
-
- /**
- * Returns the MetricStore containing all stored metrics.
- *
- * @return MetricStore containing all stored metrics;
- */
- public MetricStore getMetricStore() {
- return metrics;
- }
-
- /**
- * This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated.
- */
- public void update() {
- synchronized (this) {
- long currentTime = System.currentTimeMillis();
- if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update
- lastUpdateTime = currentTime;
- fetchMetrics();
- }
- }
- }
-
- private void fetchMetrics() {
- LOG.debug("Start fetching metrics.");
-
- try {
- Optional<T> optionalLeaderGateway = retriever.getNow();
- if (optionalLeaderGateway.isPresent()) {
- final T leaderGateway = optionalLeaderGateway.get();
-
- /*
- * Remove all metrics that belong to a job that is not running and no longer archived.
- */
- CompletableFuture<MultipleJobsDetails> jobDetailsFuture = leaderGateway.requestMultipleJobDetails(timeout);
-
- jobDetailsFuture.whenCompleteAsync(
- (MultipleJobsDetails jobDetails, Throwable throwable) -> {
- if (throwable != null) {
- LOG.debug("Fetching of JobDetails failed.", throwable);
- } else {
- ArrayList<String> toRetain = new ArrayList<>(jobDetails.getJobs().size());
- for (JobDetails job : jobDetails.getJobs()) {
- toRetain.add(job.getJobId().toString());
- }
- metrics.retainJobs(toRetain);
- }
- },
- executor);
-
- CompletableFuture<Collection<String>> queryServicePathsFuture = leaderGateway.requestMetricQueryServicePaths(timeout);
-
- queryServicePathsFuture.whenCompleteAsync(
- (Collection<String> queryServicePaths, Throwable throwable) -> {
- if (throwable != null) {
- LOG.warn("Requesting paths for query services failed.", throwable);
- } else {
- for (String queryServicePath : queryServicePaths) {
- retrieveAndQueryMetrics(queryServicePath);
- }
- }
- },
- executor);
-
- // TODO: Once the old code has been ditched, remove the explicit TaskManager query service discovery
- // TODO: and return it as part of requestQueryServicePaths. Moreover, change the MetricStore such that
- // TODO: we don't have to explicitly retain the valid TaskManagers, e.g. letting it be a cache with expiry time
- CompletableFuture<Collection<Tuple2<ResourceID, String>>> taskManagerQueryServicePathsFuture = leaderGateway
- .requestTaskManagerMetricQueryServicePaths(timeout);
-
- taskManagerQueryServicePathsFuture.whenCompleteAsync(
- (Collection<Tuple2<ResourceID, String>> queryServicePaths, Throwable throwable) -> {
- if (throwable != null) {
- LOG.warn("Requesting TaskManager's path for query services failed.", throwable);
- } else {
- List<String> taskManagersToRetain = queryServicePaths
- .stream()
- .map(
- (Tuple2<ResourceID, String> tuple) -> {
- retrieveAndQueryMetrics(tuple.f1);
- return tuple.f0.getResourceIdString();
- }
- ).collect(Collectors.toList());
-
- metrics.retainTaskManagers(taskManagersToRetain);
- }
- },
- executor);
- }
- } catch (Exception e) {
- LOG.warn("Exception while fetching metrics.", e);
- }
- }
+public interface MetricFetcher {
/**
- * Retrieves and queries the specified QueryServiceGateway.
+ * Get {@link MetricStore} which contains all currently fetched metrics.
*
- * @param queryServicePath specifying the QueryServiceGateway
+ * @return {@link MetricStore} with all fetched metrics
*/
- private void retrieveAndQueryMetrics(String queryServicePath) {
- LOG.debug("Retrieve metric query service gateway for {}", queryServicePath);
-
- final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath);
-
- queryServiceGatewayFuture.whenCompleteAsync(
- (MetricQueryServiceGateway queryServiceGateway, Throwable t) -> {
- if (t != null) {
- LOG.debug("Could not retrieve QueryServiceGateway.", t);
- } else {
- queryMetrics(queryServiceGateway);
- }
- },
- executor);
- }
+ MetricStore getMetricStore();
/**
- * Query the metrics from the given QueryServiceGateway.
- *
- * @param queryServiceGateway to query for metrics
+ * Trigger fetching of metrics.
*/
- private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
- LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
-
- queryServiceGateway
- .queryMetrics(timeout)
- .whenCompleteAsync(
- (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
- if (t != null) {
- LOG.debug("Fetching metrics failed.", t);
- } else {
- metrics.addAll(deserializer.deserialize(result));
- }
- },
- executor);
- }
+ void update();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
similarity index 86%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
index 44a6169..861f512 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -33,24 +35,26 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
/**
- * The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers.
+ * Implementation of {@link MetricFetcher} which fetches metrics from the {@link MetricQueryServiceGateway}.
*
- * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since
- * the last call has passed.
+ * @param <T> type of the {@link RestfulGateway} from which to retrieve the metric query service path.
*/
-public class MetricFetcher<T extends RestfulGateway> {
- private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
+public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetcher {
+ private static final Logger LOG = LoggerFactory.getLogger(MetricFetcherImpl.class);
private final GatewayRetriever<T> retriever;
private final MetricQueryServiceRetriever queryServiceRetriever;
@@ -62,7 +66,7 @@ public class MetricFetcher<T extends RestfulGateway> {
private long lastUpdateTime;
- public MetricFetcher(
+ public MetricFetcherImpl(
GatewayRetriever<T> retriever,
MetricQueryServiceRetriever queryServiceRetriever,
Executor executor,
@@ -78,6 +82,7 @@ public class MetricFetcher<T extends RestfulGateway> {
*
* @return MetricStore containing all stored metrics;
*/
+ @Override
public MetricStore getMetricStore() {
return metrics;
}
@@ -85,10 +90,11 @@ public class MetricFetcher<T extends RestfulGateway> {
/**
* This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated.
*/
+ @Override
public void update() {
synchronized (this) {
long currentTime = System.currentTimeMillis();
- if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update
+ if (currentTime - lastUpdateTime > 10000L) {
lastUpdateTime = currentTime;
fetchMetrics();
}
@@ -207,4 +213,19 @@ public class MetricFetcher<T extends RestfulGateway> {
},
executor);
}
+
+ @Nonnull
+ public static <T extends RestfulGateway> MetricFetcherImpl<T> fromConfiguration(
+ final Configuration configuration,
+ final MetricQueryServiceRetriever metricQueryServiceRetriever,
+ final GatewayRetriever<T> dispatcherGatewayRetriever,
+ final ExecutorService executor) {
+ final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
+ return new MetricFetcherImpl<>(
+ dispatcherGatewayRetriever,
+ metricQueryServiceRetriever,
+ executor,
+ timeout);
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java
new file mode 100644
index 0000000..28d5450
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/VoidMetricFetcher.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.metrics;
+
+/**
+ * No-op implementation of the {@link MetricFetcher}.
+ */
+public enum VoidMetricFetcher implements MetricFetcher {
+ INSTANCE;
+
+ private static final MetricStore METRIC_STORE = new MetricStore();
+
+ @Override
+ public MetricStore getMetricStore() {
+ return METRIC_STORE;
+ }
+
+ @Override
+ public void update() {
+ // noop
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index 6822417..948ec93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.rest.handler.job.JobVertexDetailsHandler;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index ec14aa3..f215ab8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -77,6 +77,7 @@ import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandle
import org.apache.flink.runtime.rest.handler.legacy.files.StdoutFileHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler;
@@ -157,7 +158,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
private final ExecutionGraphCache executionGraphCache;
private final CheckpointStatsCache checkpointStatsCache;
- private final MetricFetcher<? extends T> metricFetcher;
+ private final MetricFetcher metricFetcher;
private final LeaderElectionService leaderElectionService;
@@ -193,11 +194,11 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
this.checkpointStatsCache = new CheckpointStatsCache(
restConfiguration.getMaxCheckpointStatisticCacheEntries());
- this.metricFetcher = new MetricFetcher<>(
- leaderRetriever,
+ this.metricFetcher = MetricFetcherImpl.fromConfiguration(
+ clusterConfiguration,
metricQueryServiceRetriever,
- executor,
- restConfiguration.getTimeout());
+ leaderRetriever,
+ executor);
this.leaderElectionService = Preconditions.checkNotNull(leaderElectionService);
this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index f00e49f..10d670e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
@@ -118,7 +119,7 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
// Instance the handler.
final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(new Configuration());
- final MetricFetcher<?> metricFetcher = new MetricFetcher<>(
+ final MetricFetcher metricFetcher = new MetricFetcherImpl<>(
() -> null,
path -> null,
TestingUtils.defaultExecutor(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 59c6b38..6160d4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
@@ -116,7 +117,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
emptyAccumulators);
// Change some fields so we can make it different from other sub tasks.
- final MetricFetcher<?> metricFetcher = new MetricFetcher<>(
+ final MetricFetcher metricFetcher = new MetricFetcherImpl<>(
() -> null,
path -> null,
TestingUtils.defaultExecutor(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
index 76f81dc..a01dd78 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
@@ -67,7 +67,7 @@ public class AggregatingJobsMetricsHandlerTest extends AggregatingMetricsHandler
}
@Override
- protected AggregatingJobsMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+ protected AggregatingJobsMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher fetcher) {
return new AggregatingJobsMetricsHandler(
leaderRetriever,
timeout,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
index 7db669a..41e5a06 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
@@ -88,7 +89,7 @@ public abstract class AggregatingMetricsHandlerTestBase<
@Before
public void setUp() throws Exception {
- MetricFetcher<RestfulGateway> fetcher = new MetricFetcher<RestfulGateway>(
+ MetricFetcher fetcher = new MetricFetcherImpl<RestfulGateway>(
mock(GatewayRetriever.class),
mock(MetricQueryServiceRetriever.class),
Executors.directExecutor(),
@@ -122,7 +123,7 @@ public abstract class AggregatingMetricsHandlerTestBase<
Time timeout,
Map<String, String> responseHeaders,
Executor executor,
- MetricFetcher<?> fetcher
+ MetricFetcher fetcher
);
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
index 3c6c9f1..06b5b71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
@@ -79,7 +79,7 @@ public class AggregatingSubtasksMetricsHandlerTest extends AggregatingMetricsHan
}
@Override
- protected AggregatingSubtasksMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+ protected AggregatingSubtasksMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher fetcher) {
return new AggregatingSubtasksMetricsHandler(
leaderRetriever,
timeout,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
index 14d3777..4e2cf24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
@@ -68,7 +68,7 @@ public class AggregatingTaskManagersMetricsHandlerTest extends AggregatingMetric
}
@Override
- protected AggregatingTaskManagersMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+ protected AggregatingTaskManagersMetricsHandler getHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher fetcher) {
return new AggregatingTaskManagersMetricsHandler(
leaderRetriever,
timeout,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index 61c028f..e1ec719 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -108,7 +108,7 @@ public class MetricFetcherTest extends TestLogger {
when(queryServiceRetriever.retrieveService(eq(tmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
// ========= start MetricFetcher testing =======================================================================
- MetricFetcher fetcher = new MetricFetcher<>(
+ MetricFetcher fetcher = new MetricFetcherImpl<>(
retriever,
queryServiceRetriever,
Executors.directExecutor(),