You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/26 19:13:21 UTC
flink git commit: [FLINK-7718] [flip6] Add JobVertexMetricsHandler to
DispatcherRestEndpoint
Repository: flink
Updated Branches:
refs/heads/master d7911c5a8 -> 1119c48c2
[FLINK-7718] [flip6] Add JobVertexMetricsHandler to DispatcherRestEndpoint
Migrate logic in
org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler to
new handler and add new handler to DispatcherRestEndpoint. Add common classes
for remaining implementations of
org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler,
which require migration as well.
[FLINK-7718] [flip6] Clean up JobVertexMetricsHandlerHeaders
[FLINK-7718] [flip6] Assert that HTTP code is 404 if metric is unknown
[FLINK-7718] [flip6] Minor fixes in Javadocs
[FLINK-7718] [flip6] Add unit test for AbstractMetricsHandlerHeaders
[FLINK-7718] [flip6] Let unit tests inherit from TestLogger
[FLINK-7718] [flip6] Re-format Metric constructor
[FLINK-7718] [flip6] Fix mistake in Javadoc of AbstractMetricsHandlerHeaders
[FLINK-7718] [flip6] Rename AbstractMetricsHandlerHeaders to AbstractMetricsHeaders
Strip the term Handler from the Header class. Also rename its subclasses.
[FLINK-7718] [flip6] No longer return HTTP 404 if metric is unknown
[FLINK-7718] [flip6] Make JobVertexMetricsHeaders class final
[FLINK-7718] [flip6] Introduce MetricsHandlerTestBase for future MetricHandlers
[FLINK-7718] [flip6] Always return same MessageParameter objects in JobVertexMetricsMessageParameters
This closes #5055.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1119c48c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1119c48c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1119c48c
Branch: refs/heads/master
Commit: 1119c48c2470e280aaf3441ce170245070dd0986
Parents: d7911c5
Author: gyao <ga...@data-artisans.com>
Authored: Wed Nov 22 18:58:03 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Nov 24 18:30:47 2017 +0100
----------------------------------------------------------------------
.../dispatcher/DispatcherRestEndpoint.java | 10 +
.../job/metrics/AbstractMetricsHandler.java | 136 +++++++++++
.../job/metrics/JobVertexMetricsHandler.java | 68 ++++++
.../handler/legacy/metrics/MetricStore.java | 2 +-
.../job/metrics/AbstractMetricsHeaders.java | 60 +++++
.../job/metrics/JobVertexMetricsHeaders.java | 52 ++++
.../JobVertexMetricsMessageParameters.java | 41 ++++
.../rest/messages/job/metrics/Metric.java | 91 +++++++
.../metrics/MetricCollectionResponseBody.java | 113 +++++++++
.../job/metrics/MetricsFilterParameter.java | 48 ++++
.../job/metrics/AbstractMetricsHandlerTest.java | 236 +++++++++++++++++++
.../metrics/JobVertexMetricsHandlerTest.java | 67 ++++++
.../job/metrics/MetricsHandlerTestBase.java | 144 +++++++++++
.../RestResponseMarshallingTestBase.java | 12 +-
.../job/metrics/AbstractMetricsHeadersTest.java | 76 ++++++
.../MetricCollectionResponseBodyTest.java | 79 +++++++
.../job/metrics/MetricsFilterParameterTest.java | 53 +++++
17 files changed, 1286 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 3b262c7..2991f0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatistic
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
@@ -67,6 +68,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDet
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
@@ -306,6 +308,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
executionGraphCache,
executor);
+ final JobVertexMetricsHandler jobVertexMetricsHandler = new JobVertexMetricsHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ metricFetcher);
+
final File tmpDir = restConfiguration.getTmpDir();
Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -341,6 +350,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
handlers.add(Tuple2.of(TaskManagerDetailsHeaders.getInstance(), taskManagerDetailsHandler));
handlers.add(Tuple2.of(SubtasksTimesHeaders.getInstance(), subtasksTimesHandler));
+ handlers.add(Tuple2.of(JobVertexMetricsHeaders.getInstance(), jobVertexMetricsHandler));
optWebContent.ifPresent(
webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java
new file mode 100644
index 0000000..54ef081
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Request handler that returns for a given task a list of all available metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * @param <M> Type of the concrete {@link MessageParameters}
+ */
+public abstract class AbstractMetricsHandler<M extends MessageParameters> extends
+ AbstractRestHandler<DispatcherGateway, EmptyRequestBody, MetricCollectionResponseBody, M> {
+
+ private final MetricFetcher metricFetcher;
+
+ public AbstractMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<DispatcherGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> headers,
+ MessageHeaders<EmptyRequestBody, MetricCollectionResponseBody, M> messageHeaders,
+ MetricFetcher metricFetcher) {
+ super(localRestAddress, leaderRetriever, timeout, headers, messageHeaders);
+ this.metricFetcher = requireNonNull(metricFetcher, "metricFetcher must not be null");
+ }
+
+ @Override
+ protected final CompletableFuture<MetricCollectionResponseBody> handleRequest(
+ @Nonnull HandlerRequest<EmptyRequestBody, M> request,
+ @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+ metricFetcher.update();
+
+ final MetricStore.ComponentMetricStore componentMetricStore = getComponentMetricStore(
+ request,
+ metricFetcher.getMetricStore());
+
+ if (componentMetricStore == null || componentMetricStore.metrics == null) {
+ return CompletableFuture.completedFuture(
+ new MetricCollectionResponseBody(Collections.emptyList()));
+ }
+
+ final Set<String> requestedMetrics = new HashSet<>(request.getQueryParameter(
+ MetricsFilterParameter.class));
+
+ if (requestedMetrics.isEmpty()) {
+ return CompletableFuture.completedFuture(
+ new MetricCollectionResponseBody(getAvailableMetrics(componentMetricStore)));
+ } else {
+ final List<Metric> metrics = getRequestedMetrics(componentMetricStore, requestedMetrics);
+ return CompletableFuture.completedFuture(new MetricCollectionResponseBody(metrics));
+ }
+ }
+
+ /**
+ * Returns the {@link MetricStore.ComponentMetricStore} that should be queried for metrics.
+ */
+ @Nullable
+ protected abstract MetricStore.ComponentMetricStore getComponentMetricStore(
+ HandlerRequest<EmptyRequestBody, M> request,
+ MetricStore metricStore);
+
+ private static List<Metric> getAvailableMetrics(MetricStore.ComponentMetricStore componentMetricStore) {
+ return componentMetricStore.metrics
+ .keySet()
+ .stream()
+ .map(Metric::new)
+ .collect(Collectors.toList());
+ }
+
+ private static List<Metric> getRequestedMetrics(
+ MetricStore.ComponentMetricStore componentMetricStore,
+ Set<String> requestedMetrics) throws RestHandlerException {
+
+ final List<Metric> metrics = new ArrayList<>(requestedMetrics.size());
+ for (final String requestedMetric : requestedMetrics) {
+ final String value = componentMetricStore.getMetric(requestedMetric, null);
+ if (value != null) {
+ metrics.add(new Metric(requestedMetric, value));
+ }
+ }
+ return metrics;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
new file mode 100644
index 0000000..4f83db2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsMessageParameters;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handler that returns metrics given a {@link JobID} and {@link JobVertexID}.
+ *
+ * @see MetricStore#getTaskMetricStore(String, String)
+ */
+public class JobVertexMetricsHandler extends AbstractMetricsHandler<JobVertexMetricsMessageParameters> {
+
+ public JobVertexMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<DispatcherGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> headers,
+ MetricFetcher metricFetcher) {
+
+ super(localRestAddress, leaderRetriever, timeout, headers,
+ JobVertexMetricsHeaders.getInstance(),
+ metricFetcher);
+ }
+
+ @Override
+ protected MetricStore.ComponentMetricStore getComponentMetricStore(
+ HandlerRequest<EmptyRequestBody, JobVertexMetricsMessageParameters> request,
+ MetricStore metricStore) {
+
+ final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+ final JobVertexID vertexId = request.getPathParameter(JobVertexIdPathParameter.class);
+
+ return metricStore.getTaskMetricStore(jobId.toString(), vertexId.toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index 473b9c2..26025e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -167,7 +167,7 @@ public class MetricStore {
}
@VisibleForTesting
- void add(MetricDump metric) {
+ public void add(MetricDump metric) {
try {
QueryScopeInfo info = metric.scopeInfo;
TaskManagerMetricStore tm;
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeaders.java
new file mode 100644
index 0000000..2deef63
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeaders.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * {@link MessageHeaders} for {@link org.apache.flink.runtime.rest.handler.job.metrics.AbstractMetricsHandler}.
+ */
+public abstract class AbstractMetricsHeaders<M extends MessageParameters> implements
+ MessageHeaders<EmptyRequestBody, MetricCollectionResponseBody, M> {
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<MetricCollectionResponseBody> getResponseClass() {
+ return MetricCollectionResponseBody.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public abstract M getUnresolvedMessageParameters();
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public abstract String getTargetRestEndpointURL();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeaders.java
new file mode 100644
index 0000000..002b76d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeaders.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+/**
+ * {@link MessageHeaders} for {@link JobVertexMetricsHandler}.
+ */
+public final class JobVertexMetricsHeaders extends
+ AbstractMetricsHeaders<JobVertexMetricsMessageParameters> {
+
+ private static final JobVertexMetricsHeaders INSTANCE =
+ new JobVertexMetricsHeaders();
+
+ private JobVertexMetricsHeaders() {
+ }
+
+ @Override
+ public JobVertexMetricsMessageParameters getUnresolvedMessageParameters() {
+ return new JobVertexMetricsMessageParameters();
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/jobs/:" + JobIDPathParameter.KEY + "/vertices/:" + JobVertexIdPathParameter.KEY + "/metrics";
+ }
+
+ public static JobVertexMetricsHeaders getInstance() {
+ return INSTANCE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsMessageParameters.java
new file mode 100644
index 0000000..77fd7a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsMessageParameters.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * {@link MessageParameters} for {@link JobVertexMetricsHandler}.
+ */
+public class JobVertexMetricsMessageParameters extends JobVertexMessageParameters {
+
+ private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter();
+
+ @Override
+ public Collection<MessageQueryParameter<?>> getQueryParameters() {
+ return Collections.singletonList(metricsFilterParameter);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/Metric.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/Metric.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/Metric.java
new file mode 100644
index 0000000..84f9b49
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/Metric.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Response type for a Metric and Metric-Value-Pair.
+ *
+ * @see org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore
+ */
+public class Metric {
+
+ private static final String FIELD_NAME_ID = "id";
+
+ private static final String FIELD_NAME_VALUE = "value";
+
+ @JsonProperty(value = FIELD_NAME_ID, required = true)
+ private final String id;
+
+ /**
+ * The value of the metric. If <code>null</code>, the field should not show up in the JSON
+ * representation.
+ */
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(value = FIELD_NAME_VALUE)
+ private final String value;
+
+ /**
+ * Creates a new {@link Metric} with a possible value.
+ *
+ * @param id Name of the metric.
+ * @param value Value of the metric. Can be <code>null</code>.
+ */
+ @JsonCreator
+ public Metric(
+ final @JsonProperty(value = FIELD_NAME_ID, required = true) String id,
+ final @Nullable @JsonProperty(FIELD_NAME_VALUE) String value) {
+
+ this.id = requireNonNull(id, "id must not be null");
+ this.value = value;
+ }
+
+ /**
+ * Creates a new {@link Metric} without a value.
+ *
+ * @param id Name of the metric.
+ */
+ public Metric(final String id) {
+ this(id, null);
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "Metric{" +
+ "id='" + id + '\'' +
+ ", value='" + value + '\'' +
+ '}';
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBody.java
new file mode 100644
index 0000000..f6c380d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBody.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Response type for a collection of metrics.
+ *
+ * <p>As JSON this type will be represented as an array of
+ * metrics, i.e., the field <code>metrics</code> will not show up. For example, a collection with a
+ * single metric will be represented as follows:
+ * <pre>
+ * {@code
+ * [{"id": "metricName", "value": "1"}]
+ * }
+ * </pre>
+ *
+ * @see Serializer
+ * @see Deserializer
+ * @see org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore
+ */
+@JsonSerialize(using = MetricCollectionResponseBody.Serializer.class)
+@JsonDeserialize(using = MetricCollectionResponseBody.Deserializer.class)
+public final class MetricCollectionResponseBody implements ResponseBody {
+
+ private final Collection<Metric> metrics;
+
+ public MetricCollectionResponseBody(Collection<Metric> metrics) {
+ this.metrics = requireNonNull(metrics, "metrics must not be null");
+ }
+
+ public Collection<Metric> getMetrics() {
+ return metrics;
+ }
+
+ /**
+ * JSON serializer for {@link MetricCollectionResponseBody}.
+ */
+ public static class Serializer extends StdSerializer<MetricCollectionResponseBody> {
+
+ private static final long serialVersionUID = 1L;
+
+ protected Serializer() {
+ super(MetricCollectionResponseBody.class);
+ }
+
+ @Override
+ public void serialize(
+ MetricCollectionResponseBody metricCollectionResponseBody,
+ JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws IOException {
+
+ jsonGenerator.writeObject(metricCollectionResponseBody.getMetrics());
+ }
+ }
+
+ /**
+ * JSON deserializer for {@link MetricCollectionResponseBody}.
+ */
+ public static class Deserializer extends StdDeserializer<MetricCollectionResponseBody> {
+
+ private static final long serialVersionUID = 1L;
+
+ protected Deserializer() {
+ super(MetricCollectionResponseBody.class);
+ }
+
+ @Override
+ public MetricCollectionResponseBody deserialize(
+ JsonParser jsonParser,
+ DeserializationContext deserializationContext) throws IOException {
+
+ return new MetricCollectionResponseBody(jsonParser.readValueAs(
+ new TypeReference<List<Metric>>() {
+ }));
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameter.java
new file mode 100644
index 0000000..b01d2a9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+/**
+ * {@link MessageQueryParameter} for filtering metrics provided by
+ * {@link MetricStore}.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.metrics.AbstractMetricsHandler
+ */
+public class MetricsFilterParameter extends MessageQueryParameter<String> {
+
+ private static final String QUERY_PARAMETER_NAME = "get";
+
+ public MetricsFilterParameter() {
+ super(QUERY_PARAMETER_NAME, MessageParameterRequisiteness.OPTIONAL);
+ }
+
+ @Override
+ public String convertValueFromString(String value) {
+ return value;
+ }
+
+ @Override
+ public String convertStringToValue(String value) {
+ return value;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java
new file mode 100644
index 0000000..241bbba
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AbstractMetricsHandler}.
+ */
+public class AbstractMetricsHandlerTest extends TestLogger {
+
+ private static final String TEST_METRIC_NAME = "test_counter";
+
+ private static final int TEST_METRIC_VALUE = 1000;
+
+ private static final String METRICS_FILTER_QUERY_PARAM = "get";
+
+ @Mock
+ private MetricFetcher mockMetricFetcher;
+
+ @Mock
+ private DispatcherGateway mockDispatcherGateway;
+
+ private TestMetricsHandler testMetricsHandler;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ final MetricStore metricStore = new MetricStore();
+ metricStore.add(new MetricDump.CounterDump(
+ new QueryScopeInfo.JobManagerQueryScopeInfo(),
+ TEST_METRIC_NAME,
+ TEST_METRIC_VALUE));
+
+ when(mockMetricFetcher.getMetricStore()).thenReturn(metricStore);
+
+ testMetricsHandler = new TestMetricsHandler(
+ CompletableFuture.completedFuture("localhost:1234"),
+ new GatewayRetriever<DispatcherGateway>() {
+ @Override
+ public CompletableFuture<DispatcherGateway> getFuture() {
+ return CompletableFuture.completedFuture(mockDispatcherGateway);
+ }
+ },
+ Time.milliseconds(50),
+ Collections.emptyMap(),
+ new TestMetricsHeaders(),
+ mockMetricFetcher);
+ }
+
+ @Test
+ public void testListMetrics() throws Exception {
+ final CompletableFuture<MetricCollectionResponseBody> completableFuture =
+ testMetricsHandler.handleRequest(
+ new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ new TestMessageParameters(),
+ Collections.emptyMap(),
+ Collections.emptyMap()),
+ mockDispatcherGateway);
+
+ assertTrue(completableFuture.isDone());
+
+ final MetricCollectionResponseBody metricCollectionResponseBody = completableFuture.get();
+ assertThat(metricCollectionResponseBody.getMetrics(), hasSize(1));
+
+ final Metric metric = metricCollectionResponseBody.getMetrics().iterator().next();
+ assertThat(metric.getId(), equalTo(TEST_METRIC_NAME));
+ assertThat(metric.getValue(), equalTo(null));
+ }
+
+ @Test
+ public void testReturnEmptyListIfNoComponentMetricStore() throws Exception {
+ testMetricsHandler.returnComponentMetricStore = false;
+
+ final CompletableFuture<MetricCollectionResponseBody> completableFuture =
+ testMetricsHandler.handleRequest(
+ new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ new TestMessageParameters(),
+ Collections.emptyMap(),
+ Collections.emptyMap()),
+ mockDispatcherGateway);
+
+ assertTrue(completableFuture.isDone());
+
+ final MetricCollectionResponseBody metricCollectionResponseBody = completableFuture.get();
+ assertThat(metricCollectionResponseBody.getMetrics(), empty());
+ }
+
+ @Test
+ public void testGetMetrics() throws Exception {
+ final CompletableFuture<MetricCollectionResponseBody> completableFuture =
+ testMetricsHandler.handleRequest(
+ new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ new TestMessageParameters(),
+ Collections.emptyMap(),
+ Collections.singletonMap(METRICS_FILTER_QUERY_PARAM, Collections.singletonList(TEST_METRIC_NAME))),
+ mockDispatcherGateway);
+
+ assertTrue(completableFuture.isDone());
+
+ final MetricCollectionResponseBody metricCollectionResponseBody = completableFuture.get();
+ assertThat(metricCollectionResponseBody.getMetrics(), hasSize(1));
+
+ final Metric metric = metricCollectionResponseBody.getMetrics().iterator().next();
+ assertThat(metric.getId(), equalTo(TEST_METRIC_NAME));
+ assertThat(metric.getValue(), equalTo(Integer.toString(TEST_METRIC_VALUE)));
+ }
+
+ @Test
+ public void testReturnEmptyListIfRequestedMetricIsUnknown() throws Exception {
+ final CompletableFuture<MetricCollectionResponseBody> completableFuture =
+ testMetricsHandler.handleRequest(
+ new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ new TestMessageParameters(),
+ Collections.emptyMap(),
+ Collections.singletonMap(METRICS_FILTER_QUERY_PARAM, Collections.singletonList("unknown_metric"))),
+ mockDispatcherGateway);
+
+ assertTrue(completableFuture.isDone());
+
+ final MetricCollectionResponseBody metricCollectionResponseBody = completableFuture.get();
+ assertThat(metricCollectionResponseBody.getMetrics(), empty());
+ }
+
+ private static class TestMetricsHandler extends AbstractMetricsHandler<TestMessageParameters> {
+
+ private boolean returnComponentMetricStore = true;
+
+ private TestMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<DispatcherGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> headers,
+ MessageHeaders<EmptyRequestBody,
+ MetricCollectionResponseBody,
+ TestMessageParameters> messageHeaders,
+ MetricFetcher metricFetcher) {
+
+ super(localRestAddress, leaderRetriever, timeout, headers, messageHeaders, metricFetcher);
+ }
+
+ @Nullable
+ @Override
+ protected MetricStore.ComponentMetricStore getComponentMetricStore(
+ HandlerRequest<EmptyRequestBody,
+ TestMessageParameters> request,
+ MetricStore metricStore) {
+ return returnComponentMetricStore ? metricStore.getJobManager() : null;
+ }
+ }
+
+ private static class TestMetricsHeaders extends
+ AbstractMetricsHeaders<TestMessageParameters> {
+
+ @Override
+ public TestMessageParameters getUnresolvedMessageParameters() {
+ return new TestMessageParameters();
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/";
+ }
+ }
+
+ private static class TestMessageParameters extends MessageParameters {
+
+ @Override
+ public Collection<MessagePathParameter<?>> getPathParameters() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Collection<MessageQueryParameter<?>> getQueryParameters() {
+ return Collections.singletonList(new MetricsFilterParameter());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java
new file mode 100644
index 0000000..82d331e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for {@link JobVertexMetricsHandler}.
+ */
+public class JobVertexMetricsHandlerTest extends MetricsHandlerTestBase<JobVertexMetricsHandler> {
+
+ private static final String TEST_JOB_ID = new JobID().toString();
+
+ private static final String TEST_VERTEX_ID = new JobVertexID().toString();
+
+ private static final int TEST_SUBTASK_INDEX = 1;
+
+ @Override
+ JobVertexMetricsHandler getMetricsHandler() {
+ return new JobVertexMetricsHandler(
+ TEST_REST_ADDRESS,
+ leaderRetriever,
+ TIMEOUT,
+ TEST_HEADERS,
+ mockMetricFetcher);
+ }
+
+ @Override
+ QueryScopeInfo getQueryScopeInfo() {
+ return new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX);
+ }
+
+ @Override
+ Map<String, String> getPathParameters() {
+ final HashMap<String, String> pathParameters = new HashMap<>();
+ pathParameters.put("jobid", TEST_JOB_ID);
+ pathParameters.put("vertexid", TEST_VERTEX_ID);
+ return pathParameters;
+ }
+
+ @Override
+ String getExpectedIdForMetricName(final String metricName) {
+ return String.format("%s.%s", TEST_SUBTASK_INDEX, metricName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
new file mode 100644
index 0000000..05a1163
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test base class for subclasses of {@link AbstractMetricsHandler}.
+ */
+public abstract class MetricsHandlerTestBase<T extends
+ AbstractMetricsHandler> extends TestLogger {
+
+ private static final String TEST_METRIC_NAME = "test_counter";
+
+ private static final int TEST_METRIC_VALUE = 1000;
+
+ static final CompletableFuture<String> TEST_REST_ADDRESS =
+ CompletableFuture.completedFuture("localhost:12345");
+
+ static final Time TIMEOUT = Time.milliseconds(50);
+
+ static final Map<String, String> TEST_HEADERS = Collections.emptyMap();
+
+ @Mock
+ MetricFetcher mockMetricFetcher;
+
+ GatewayRetriever<DispatcherGateway> leaderRetriever;
+
+ @Mock
+ private DispatcherGateway mockDispatcherGateway;
+
+ private T metricsHandler;
+
+ private Map<String, String> pathParameters;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ this.leaderRetriever = new GatewayRetriever<DispatcherGateway>() {
+ @Override
+ public CompletableFuture<DispatcherGateway> getFuture() {
+ return CompletableFuture.completedFuture(mockDispatcherGateway);
+ }
+ };
+ this.pathParameters = getPathParameters();
+ this.metricsHandler = getMetricsHandler();
+
+ final MetricStore metricStore = new MetricStore();
+ metricStore.add(new MetricDump.CounterDump(getQueryScopeInfo(), TEST_METRIC_NAME,
+ TEST_METRIC_VALUE));
+ when(mockMetricFetcher.getMetricStore()).thenReturn(metricStore);
+ }
+
+ /**
+ * Tests that the metric with name defined under {@link #TEST_METRIC_NAME} can be retrieved
+ * from the {@link MetricStore.ComponentMetricStore} returned from
+ * {@link AbstractMetricsHandler#getComponentMetricStore(HandlerRequest, MetricStore)}.
+ */
+ @Test
+ public void testGetMetric() throws Exception {
+ @SuppressWarnings("unchecked") final CompletableFuture<MetricCollectionResponseBody> completableFuture =
+ metricsHandler.handleRequest(
+ new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ metricsHandler.getMessageHeaders().getUnresolvedMessageParameters(),
+ pathParameters,
+ Collections.emptyMap()),
+ mockDispatcherGateway);
+
+ assertTrue(completableFuture.isDone());
+
+ final MetricCollectionResponseBody metricCollectionResponseBody = completableFuture.get();
+ assertThat(metricCollectionResponseBody.getMetrics(), hasSize(1));
+
+ final Metric metric = metricCollectionResponseBody.getMetrics().iterator().next();
+ assertThat(metric.getId(), equalTo(getExpectedIdForMetricName(TEST_METRIC_NAME)));
+ }
+
+ /**
+ * Returns instance under test.
+ */
+ abstract T getMetricsHandler();
+
+ abstract QueryScopeInfo getQueryScopeInfo();
+
+ abstract Map<String, String> getPathParameters();
+
+ /**
+ * Returns the expected metric id for a given metric name. By default the metric name without
+ * any modifications is returned.
+ *
+ * @param metricName The metric name.
+ * @return The id of the metric name possibly with additional information, e.g., subtask index
+ * as a prefix.
+ *
+ */
+ String getExpectedIdForMetricName(final String metricName) {
+ return metricName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java
index bbdbf98..2604bd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java
@@ -57,7 +57,17 @@ public abstract class RestResponseMarshallingTestBase<R extends ResponseBody> ex
JsonNode json = objectMapper.valueToTree(expected);
final R unmarshalled = objectMapper.treeToValue(json, getTestResponseClass());
- Assert.assertEquals(expected, unmarshalled);
+ assertOriginalEqualsToUnmarshalled(expected, unmarshalled);
+ }
+
+ /**
+ * Asserts that two objects are equal. If they are not, an {@link AssertionError} is thrown.
+ *
+ * @param expected expected value
+ * @param actual the value to check against expected
+ */
+ protected void assertOriginalEqualsToUnmarshalled(R expected, R actual) {
+ Assert.assertEquals(expected, actual);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeadersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeadersTest.java
new file mode 100644
index 0000000..51a6ff8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeadersTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AbstractMetricsHeaders}.
+ */
+public class AbstractMetricsHeadersTest extends TestLogger {
+
+ private AbstractMetricsHeaders<EmptyMessageParameters> metricsHandlerHeaders;
+
+ @Before
+ public void setUp() throws Exception {
+ metricsHandlerHeaders = new AbstractMetricsHeaders<EmptyMessageParameters>() {
+ @Override
+ public EmptyMessageParameters getUnresolvedMessageParameters() {
+ return EmptyMessageParameters.getInstance();
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/";
+ }
+ };
+ }
+
+ @Test
+ public void testHttpMethod() {
+ assertThat(metricsHandlerHeaders.getHttpMethod(), equalTo(HttpMethodWrapper.GET));
+ }
+
+ @Test
+ public void testResponseStatus() {
+ assertThat(metricsHandlerHeaders.getResponseStatusCode(), equalTo(HttpResponseStatus.OK));
+ }
+
+ @Test
+ public void testRequestClass() {
+ assertThat(metricsHandlerHeaders.getRequestClass(), equalTo(EmptyRequestBody.class));
+ }
+
+ @Test
+ public void testResponseClass() {
+ assertThat(metricsHandlerHeaders.getResponseClass(), equalTo(MetricCollectionResponseBody.class));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBodyTest.java
new file mode 100644
index 0000000..dfb2682
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBodyTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.not;
+
+/**
+ * Tests for {@link MetricCollectionResponseBody}.
+ */
+public class MetricCollectionResponseBodyTest extends
+ RestResponseMarshallingTestBase<MetricCollectionResponseBody> {
+
+ private static final String TEST_METRIC_NAME = "metric1";
+
+ private static final String TEST_METRIC_VALUE = "1000";
+
+ @Override
+ protected Class<MetricCollectionResponseBody> getTestResponseClass() {
+ return MetricCollectionResponseBody.class;
+ }
+
+ @Override
+ protected MetricCollectionResponseBody getTestResponseInstance() {
+ return new MetricCollectionResponseBody(Collections.singleton(new Metric(
+ TEST_METRIC_NAME,
+ TEST_METRIC_VALUE)));
+ }
+
+ @Override
+ protected void assertOriginalEqualsToUnmarshalled(
+ MetricCollectionResponseBody expected,
+ MetricCollectionResponseBody actual) {
+
+ assertThat(actual.getMetrics(), hasSize(1));
+
+ final Metric metric = actual.getMetrics().iterator().next();
+ assertThat(metric.getId(), equalTo(TEST_METRIC_NAME));
+ assertThat(metric.getValue(), equalTo(TEST_METRIC_VALUE));
+ }
+
+ @Test
+ public void testNullValueNotSerialized() throws Exception {
+ final String json = RestMapperUtils.getStrictObjectMapper()
+ .writeValueAsString(
+ new MetricCollectionResponseBody(
+ Collections.singleton(new Metric(TEST_METRIC_NAME))));
+
+ assertThat(json, not(containsString("\"value\"")));
+ assertThat(json, not(containsString("\"metrics\"")));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameterTest.java
new file mode 100644
index 0000000..2756a65
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameterTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link MetricsFilterParameter}.
+ */
+public class MetricsFilterParameterTest extends TestLogger {
+
+ private MetricsFilterParameter metricsFilterParameter;
+
+ @Before
+ public void setUp() {
+ metricsFilterParameter = new MetricsFilterParameter();
+ }
+
+ @Test
+ public void testIsOptionalParameter() {
+ assertFalse(metricsFilterParameter.isMandatory());
+ }
+
+ @Test
+ public void testConversions() {
+ assertThat(metricsFilterParameter.convertStringToValue("test"), equalTo("test"));
+ assertThat(metricsFilterParameter.convertValueFromString("test"), equalTo("test"));
+ }
+
+}