You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/26 19:13:21 UTC

flink git commit: [FLINK-7718] [flip6] Add JobVertexMetricsHandler to DispatcherRestEndpoint

Repository: flink
Updated Branches:
  refs/heads/master d7911c5a8 -> 1119c48c2


[FLINK-7718] [flip6] Add JobVertexMetricsHandler to DispatcherRestEndpoint

Migrate logic in
org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler to
new handler and add new handler to DispatcherRestEndpoint. Add common classes
for remaining implementations of
org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler,
which require migration as well.

[FLINK-7718] [flip6] Clean up JobVertexMetricsHandlerHeaders

[FLINK-7718] [flip6] Assert that HTTP code is 404 if metric is unknown

[FLINK-7718] [flip6] Minor fixes in Javadocs

[FLINK-7718] [flip6] Add unit test for AbstractMetricsHandlerHeaders

[FLINK-7718] [flip6] Let unit tests inherit from TestLogger

[FLINK-7718] [flip6] Re-format Metric constructor

[FLINK-7718] [flip6] Fix mistake in Javadoc of AbstractMetricsHandlerHeaders

[FLINK-7718] [flip6] Rename AbstractMetricsHandlerHeaders to AbstractMetricsHeaders

Strip the term Handler from the Header class. Also rename its subclasses.

[FLINK-7718] [flip6] No longer return HTTP 404 if metric is unknown

[FLINK-7718] [flip6] Make JobVertexMetricsHeaders class final

[FLINK-7718] [flip6] Introduce MetricsHandlerTestBase for future MetricHandlers

[FLINK-7718] [flip6] Always return same MessageParameter objects in JobVertexMetricsMessageParameters

This closes #5055.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1119c48c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1119c48c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1119c48c

Branch: refs/heads/master
Commit: 1119c48c2470e280aaf3441ce170245070dd0986
Parents: d7911c5
Author: gyao <ga...@data-artisans.com>
Authored: Wed Nov 22 18:58:03 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Nov 24 18:30:47 2017 +0100

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      |  10 +
 .../job/metrics/AbstractMetricsHandler.java     | 136 +++++++++++
 .../job/metrics/JobVertexMetricsHandler.java    |  68 ++++++
 .../handler/legacy/metrics/MetricStore.java     |   2 +-
 .../job/metrics/AbstractMetricsHeaders.java     |  60 +++++
 .../job/metrics/JobVertexMetricsHeaders.java    |  52 ++++
 .../JobVertexMetricsMessageParameters.java      |  41 ++++
 .../rest/messages/job/metrics/Metric.java       |  91 +++++++
 .../metrics/MetricCollectionResponseBody.java   | 113 +++++++++
 .../job/metrics/MetricsFilterParameter.java     |  48 ++++
 .../job/metrics/AbstractMetricsHandlerTest.java | 236 +++++++++++++++++++
 .../metrics/JobVertexMetricsHandlerTest.java    |  67 ++++++
 .../job/metrics/MetricsHandlerTestBase.java     | 144 +++++++++++
 .../RestResponseMarshallingTestBase.java        |  12 +-
 .../job/metrics/AbstractMetricsHeadersTest.java |  76 ++++++
 .../MetricCollectionResponseBodyTest.java       |  79 +++++++
 .../job/metrics/MetricsFilterParameterTest.java |  53 +++++
 17 files changed, 1286 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 3b262c7..2991f0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatistic
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
@@ -67,6 +68,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDet
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
@@ -306,6 +308,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			executionGraphCache,
 			executor);
 
+		final JobVertexMetricsHandler jobVertexMetricsHandler = new JobVertexMetricsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			metricFetcher);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -341,6 +350,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
 		handlers.add(Tuple2.of(TaskManagerDetailsHeaders.getInstance(), taskManagerDetailsHandler));
 		handlers.add(Tuple2.of(SubtasksTimesHeaders.getInstance(), subtasksTimesHandler));
+		handlers.add(Tuple2.of(JobVertexMetricsHeaders.getInstance(), jobVertexMetricsHandler));
 
 		optWebContent.ifPresent(
 			webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java
new file mode 100644
index 0000000..54ef081
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Request handler that returns for a given task a list of all available metrics or the values for a set of metrics.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * @param <M> Type of the concrete {@link MessageParameters}
+ */
+public abstract class AbstractMetricsHandler<M extends MessageParameters> extends
+	AbstractRestHandler<DispatcherGateway, EmptyRequestBody, MetricCollectionResponseBody, M> {
+
+	private final MetricFetcher metricFetcher;
+
+	public AbstractMetricsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<DispatcherGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> headers,
+			MessageHeaders<EmptyRequestBody, MetricCollectionResponseBody, M> messageHeaders,
+			MetricFetcher metricFetcher) {
+		super(localRestAddress, leaderRetriever, timeout, headers, messageHeaders);
+		this.metricFetcher = requireNonNull(metricFetcher, "metricFetcher must not be null");
+	}
+
+	@Override
+	protected final CompletableFuture<MetricCollectionResponseBody> handleRequest(
+			@Nonnull HandlerRequest<EmptyRequestBody, M> request,
+			@Nonnull DispatcherGateway gateway) throws RestHandlerException {
+		metricFetcher.update();
+
+		final MetricStore.ComponentMetricStore componentMetricStore = getComponentMetricStore(
+			request,
+			metricFetcher.getMetricStore());
+
+		if (componentMetricStore == null || componentMetricStore.metrics == null) {
+			return CompletableFuture.completedFuture(
+				new MetricCollectionResponseBody(Collections.emptyList()));
+		}
+
+		final Set<String> requestedMetrics = new HashSet<>(request.getQueryParameter(
+			MetricsFilterParameter.class));
+
+		if (requestedMetrics.isEmpty()) {
+			return CompletableFuture.completedFuture(
+				new MetricCollectionResponseBody(getAvailableMetrics(componentMetricStore)));
+		} else {
+			final List<Metric> metrics = getRequestedMetrics(componentMetricStore, requestedMetrics);
+			return CompletableFuture.completedFuture(new MetricCollectionResponseBody(metrics));
+		}
+	}
+
+	/**
+	 * Returns the {@link MetricStore.ComponentMetricStore} that should be queried for metrics.
+	 */
+	@Nullable
+	protected abstract MetricStore.ComponentMetricStore getComponentMetricStore(
+		HandlerRequest<EmptyRequestBody, M> request,
+		MetricStore metricStore);
+
+	private static List<Metric> getAvailableMetrics(MetricStore.ComponentMetricStore componentMetricStore) {
+		return componentMetricStore.metrics
+			.keySet()
+			.stream()
+			.map(Metric::new)
+			.collect(Collectors.toList());
+	}
+
+	private static List<Metric> getRequestedMetrics(
+			MetricStore.ComponentMetricStore componentMetricStore,
+			Set<String> requestedMetrics) throws RestHandlerException {
+
+		final List<Metric> metrics = new ArrayList<>(requestedMetrics.size());
+		for (final String requestedMetric : requestedMetrics) {
+			final String value = componentMetricStore.getMetric(requestedMetric, null);
+			if (value != null) {
+				metrics.add(new Metric(requestedMetric, value));
+			}
+		}
+		return metrics;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
new file mode 100644
index 0000000..4f83db2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsMessageParameters;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handler that returns metrics given a {@link JobID} and {@link JobVertexID}.
+ *
+ * @see MetricStore#getTaskMetricStore(String, String)
+ */
+public class JobVertexMetricsHandler extends AbstractMetricsHandler<JobVertexMetricsMessageParameters> {
+
+	public JobVertexMetricsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<DispatcherGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> headers,
+			MetricFetcher metricFetcher) {
+
+		super(localRestAddress, leaderRetriever, timeout, headers,
+			JobVertexMetricsHeaders.getInstance(),
+			metricFetcher);
+	}
+
+	@Override
+	protected MetricStore.ComponentMetricStore getComponentMetricStore(
+			HandlerRequest<EmptyRequestBody, JobVertexMetricsMessageParameters> request,
+			MetricStore metricStore) {
+
+		final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+		final JobVertexID vertexId = request.getPathParameter(JobVertexIdPathParameter.class);
+
+		return metricStore.getTaskMetricStore(jobId.toString(), vertexId.toString());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index 473b9c2..26025e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -167,7 +167,7 @@ public class MetricStore {
 	}
 
 	@VisibleForTesting
-	void add(MetricDump metric) {
+	public void add(MetricDump metric) {
 		try {
 			QueryScopeInfo info = metric.scopeInfo;
 			TaskManagerMetricStore tm;

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeaders.java
new file mode 100644
index 0000000..2deef63
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeaders.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * {@link MessageHeaders} for {@link org.apache.flink.runtime.rest.handler.job.metrics.AbstractMetricsHandler}.
+ */
+public abstract class AbstractMetricsHeaders<M extends MessageParameters> implements
+	MessageHeaders<EmptyRequestBody, MetricCollectionResponseBody, M> {
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<MetricCollectionResponseBody> getResponseClass() {
+		return MetricCollectionResponseBody.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public abstract M getUnresolvedMessageParameters();
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public abstract String getTargetRestEndpointURL();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeaders.java
new file mode 100644
index 0000000..002b76d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeaders.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+/**
+ * {@link MessageHeaders} for {@link JobVertexMetricsHandler}.
+ */
+public final class JobVertexMetricsHeaders extends
+	AbstractMetricsHeaders<JobVertexMetricsMessageParameters> {
+
+	private static final JobVertexMetricsHeaders INSTANCE =
+		new JobVertexMetricsHeaders();
+
+	private JobVertexMetricsHeaders() {
+	}
+
+	@Override
+	public JobVertexMetricsMessageParameters getUnresolvedMessageParameters() {
+		return new JobVertexMetricsMessageParameters();
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return "/jobs/:" + JobIDPathParameter.KEY + "/vertices/:" + JobVertexIdPathParameter.KEY + "/metrics";
+	}
+
+	public static JobVertexMetricsHeaders getInstance() {
+		return INSTANCE;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsMessageParameters.java
new file mode 100644
index 0000000..77fd7a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsMessageParameters.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * {@link MessageParameters} for {@link JobVertexMetricsHandler}.
+ */
+public class JobVertexMetricsMessageParameters extends JobVertexMessageParameters {
+
+	private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter();
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.singletonList(metricsFilterParameter);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/Metric.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/Metric.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/Metric.java
new file mode 100644
index 0000000..84f9b49
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/Metric.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Response type for a Metric and Metric-Value-Pair.
+ *
+ * @see org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore
+ */
+public class Metric {
+
+	private static final String FIELD_NAME_ID = "id";
+
+	private static final String FIELD_NAME_VALUE = "value";
+
+	@JsonProperty(value = FIELD_NAME_ID, required = true)
+	private final String id;
+
+	/**
+	 * The value of the metric. If <code>null</code>, the field should not show up in the JSON
+	 * representation.
+	 */
+	@JsonInclude(JsonInclude.Include.NON_NULL)
+	@JsonProperty(value = FIELD_NAME_VALUE)
+	private final String value;
+
+	/**
+	 * Creates a new {@link Metric} with a possible value.
+	 *
+	 * @param id    Name of the metric.
+	 * @param value Value of the metric. Can be <code>null</code>.
+	 */
+	@JsonCreator
+	public Metric(
+			final @JsonProperty(value = FIELD_NAME_ID, required = true) String id,
+			final @Nullable @JsonProperty(FIELD_NAME_VALUE) String value) {
+
+		this.id = requireNonNull(id, "id must not be null");
+		this.value = value;
+	}
+
+	/**
+	 * Creates a new {@link Metric} without a value.
+	 *
+	 * @param id Name of the metric.
+	 */
+	public Metric(final String id) {
+		this(id, null);
+	}
+
+	public String getId() {
+		return id;
+	}
+
+	public String getValue() {
+		return value;
+	}
+
+	@Override
+	public String toString() {
+		return "Metric{" +
+			"id='" + id + '\'' +
+			", value='" + value + '\'' +
+			'}';
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBody.java
new file mode 100644
index 0000000..f6c380d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBody.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Response type for a collection of metrics.
+ *
+ * <p>As JSON this type will be represented as an array of
+ * metrics, i.e., the field <code>metrics</code> will not show up. For example, a collection with a
+ * single metric will be represented as follows:
+ * <pre>
+ * {@code
+ * [{"id": "metricName", "value": "1"}]
+ * }
+ * </pre>
+ *
+ * @see Serializer
+ * @see Deserializer
+ * @see org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore
+ */
+@JsonSerialize(using = MetricCollectionResponseBody.Serializer.class)
+@JsonDeserialize(using = MetricCollectionResponseBody.Deserializer.class)
+public final class MetricCollectionResponseBody implements ResponseBody {
+
+	private final Collection<Metric> metrics;
+
+	public MetricCollectionResponseBody(Collection<Metric> metrics) {
+		this.metrics = requireNonNull(metrics, "metrics must not be null");
+	}
+
+	public Collection<Metric> getMetrics() {
+		return metrics;
+	}
+
+	/**
+	 * JSON serializer for {@link MetricCollectionResponseBody}.
+	 */
+	public static class Serializer extends StdSerializer<MetricCollectionResponseBody> {
+
+		private static final long serialVersionUID = 1L;
+
+		protected Serializer() {
+			super(MetricCollectionResponseBody.class);
+		}
+
+		@Override
+		public void serialize(
+				MetricCollectionResponseBody metricCollectionResponseBody,
+				JsonGenerator jsonGenerator,
+				SerializerProvider serializerProvider) throws IOException {
+
+			jsonGenerator.writeObject(metricCollectionResponseBody.getMetrics());
+		}
+	}
+
+	/**
+	 * JSON deserializer for {@link MetricCollectionResponseBody}.
+	 */
+	public static class Deserializer extends StdDeserializer<MetricCollectionResponseBody> {
+
+		private static final long serialVersionUID = 1L;
+
+		protected Deserializer() {
+			super(MetricCollectionResponseBody.class);
+		}
+
+		@Override
+		public MetricCollectionResponseBody deserialize(
+				JsonParser jsonParser,
+				DeserializationContext deserializationContext) throws IOException {
+
+			return new MetricCollectionResponseBody(jsonParser.readValueAs(
+				new TypeReference<List<Metric>>() {
+				}));
+		}
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameter.java
new file mode 100644
index 0000000..b01d2a9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+/**
+ * {@link MessageQueryParameter} for filtering metrics provided by
+ * {@link MetricStore}.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.metrics.AbstractMetricsHandler
+ */
+public class MetricsFilterParameter extends MessageQueryParameter<String> {
+
+	private static final String QUERY_PARAMETER_NAME = "get";
+
+	public MetricsFilterParameter() {
+		super(QUERY_PARAMETER_NAME, MessageParameterRequisiteness.OPTIONAL);
+	}
+
+	@Override
+	public String convertValueFromString(String value) {
+		return value;
+	}
+
+	@Override
+	public String convertStringToValue(String value) {
+		return value;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java
new file mode 100644
index 0000000..241bbba
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AbstractMetricsHandler}.
+ */
+public class AbstractMetricsHandlerTest extends TestLogger {
+
+	private static final String TEST_METRIC_NAME = "test_counter";
+
+	private static final int TEST_METRIC_VALUE = 1000;
+
+	private static final String METRICS_FILTER_QUERY_PARAM = "get";
+
+	@Mock
+	private MetricFetcher mockMetricFetcher;
+
+	@Mock
+	private DispatcherGateway mockDispatcherGateway;
+
+	private TestMetricsHandler testMetricsHandler;
+
+	@Before
+	public void setUp() {
+		MockitoAnnotations.initMocks(this);
+
+		final MetricStore metricStore = new MetricStore();
+		metricStore.add(new MetricDump.CounterDump(
+			new QueryScopeInfo.JobManagerQueryScopeInfo(),
+			TEST_METRIC_NAME,
+			TEST_METRIC_VALUE));
+
+		when(mockMetricFetcher.getMetricStore()).thenReturn(metricStore);
+
+		testMetricsHandler = new TestMetricsHandler(
+			CompletableFuture.completedFuture("localhost:1234"),
+			new GatewayRetriever<DispatcherGateway>() {
+				@Override
+				public CompletableFuture<DispatcherGateway> getFuture() {
+					return CompletableFuture.completedFuture(mockDispatcherGateway);
+				}
+			},
+			Time.milliseconds(50),
+			Collections.emptyMap(),
+			new TestMetricsHeaders(),
+			mockMetricFetcher);
+	}
+
+	@Test
+	public void testListMetrics() throws Exception {
+		final CompletableFuture<MetricCollectionResponseBody> completableFuture =
+			testMetricsHandler.handleRequest(
+				new HandlerRequest<>(
+					EmptyRequestBody.getInstance(),
+					new TestMessageParameters(),
+					Collections.emptyMap(),
+					Collections.emptyMap()),
+				mockDispatcherGateway);
+
+		assertTrue(completableFuture.isDone());
+
+		final MetricCollectionResponseBody metricCollectionResponseBody = completableFuture.get();
+		assertThat(metricCollectionResponseBody.getMetrics(), hasSize(1));
+
+		final Metric metric = metricCollectionResponseBody.getMetrics().iterator().next();
+		assertThat(metric.getId(), equalTo(TEST_METRIC_NAME));
+		assertThat(metric.getValue(), equalTo(null));
+	}
+
+	@Test
+	public void testReturnEmptyListIfNoComponentMetricStore() throws Exception {
+		testMetricsHandler.returnComponentMetricStore = false;
+
+		final CompletableFuture<MetricCollectionResponseBody> completableFuture =
+			testMetricsHandler.handleRequest(
+				new HandlerRequest<>(
+					EmptyRequestBody.getInstance(),
+					new TestMessageParameters(),
+					Collections.emptyMap(),
+					Collections.emptyMap()),
+				mockDispatcherGateway);
+
+		assertTrue(completableFuture.isDone());
+
+		final MetricCollectionResponseBody metricCollectionResponseBody = completableFuture.get();
+		assertThat(metricCollectionResponseBody.getMetrics(), empty());
+	}
+
+	@Test
+	public void testGetMetrics() throws Exception {
+		final CompletableFuture<MetricCollectionResponseBody> completableFuture =
+			testMetricsHandler.handleRequest(
+				new HandlerRequest<>(
+					EmptyRequestBody.getInstance(),
+					new TestMessageParameters(),
+					Collections.emptyMap(),
+					Collections.singletonMap(METRICS_FILTER_QUERY_PARAM, Collections.singletonList(TEST_METRIC_NAME))),
+				mockDispatcherGateway);
+
+		assertTrue(completableFuture.isDone());
+
+		final MetricCollectionResponseBody metricCollectionResponseBody = completableFuture.get();
+		assertThat(metricCollectionResponseBody.getMetrics(), hasSize(1));
+
+		final Metric metric = metricCollectionResponseBody.getMetrics().iterator().next();
+		assertThat(metric.getId(), equalTo(TEST_METRIC_NAME));
+		assertThat(metric.getValue(), equalTo(Integer.toString(TEST_METRIC_VALUE)));
+	}
+
+	@Test
+	public void testReturnEmptyListIfRequestedMetricIsUnknown() throws Exception {
+		final CompletableFuture<MetricCollectionResponseBody> completableFuture =
+			testMetricsHandler.handleRequest(
+				new HandlerRequest<>(
+					EmptyRequestBody.getInstance(),
+					new TestMessageParameters(),
+					Collections.emptyMap(),
+					Collections.singletonMap(METRICS_FILTER_QUERY_PARAM, Collections.singletonList("unknown_metric"))),
+				mockDispatcherGateway);
+
+		assertTrue(completableFuture.isDone());
+
+		final MetricCollectionResponseBody metricCollectionResponseBody = completableFuture.get();
+		assertThat(metricCollectionResponseBody.getMetrics(), empty());
+	}
+
+	private static class TestMetricsHandler extends AbstractMetricsHandler<TestMessageParameters> {
+
+		private boolean returnComponentMetricStore = true;
+
+		private TestMetricsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<DispatcherGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> headers,
+			MessageHeaders<EmptyRequestBody,
+				MetricCollectionResponseBody,
+				TestMessageParameters> messageHeaders,
+			MetricFetcher metricFetcher) {
+
+			super(localRestAddress, leaderRetriever, timeout, headers, messageHeaders, metricFetcher);
+		}
+
+		@Nullable
+		@Override
+		protected MetricStore.ComponentMetricStore getComponentMetricStore(
+			HandlerRequest<EmptyRequestBody,
+				TestMessageParameters> request,
+			MetricStore metricStore) {
+			return returnComponentMetricStore ? metricStore.getJobManager() : null;
+		}
+	}
+
+	private static class TestMetricsHeaders extends
+		AbstractMetricsHeaders<TestMessageParameters> {
+
+		@Override
+		public TestMessageParameters getUnresolvedMessageParameters() {
+			return new TestMessageParameters();
+		}
+
+		@Override
+		public String getTargetRestEndpointURL() {
+			return "/";
+		}
+	}
+
+	private static class TestMessageParameters extends MessageParameters {
+
+		@Override
+		public Collection<MessagePathParameter<?>> getPathParameters() {
+			return Collections.emptyList();
+		}
+
+		@Override
+		public Collection<MessageQueryParameter<?>> getQueryParameters() {
+			return Collections.singletonList(new MetricsFilterParameter());
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java
new file mode 100644
index 0000000..82d331e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for {@link JobVertexMetricsHandler}.
+ */
+public class JobVertexMetricsHandlerTest extends MetricsHandlerTestBase<JobVertexMetricsHandler> {
+
+	private static final String TEST_JOB_ID = new JobID().toString();
+
+	private static final String TEST_VERTEX_ID = new JobVertexID().toString();
+
+	private static final int TEST_SUBTASK_INDEX = 1;
+
+	@Override
+	JobVertexMetricsHandler getMetricsHandler() {
+		return new JobVertexMetricsHandler(
+			TEST_REST_ADDRESS,
+			leaderRetriever,
+			TIMEOUT,
+			TEST_HEADERS,
+			mockMetricFetcher);
+	}
+
+	@Override
+	QueryScopeInfo getQueryScopeInfo() {
+		return new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX);
+	}
+
+	@Override
+	Map<String, String> getPathParameters() {
+		final HashMap<String, String> pathParameters = new HashMap<>();
+		pathParameters.put("jobid", TEST_JOB_ID);
+		pathParameters.put("vertexid", TEST_VERTEX_ID);
+		return pathParameters;
+	}
+
+	@Override
+	String getExpectedIdForMetricName(final String metricName) {
+		return String.format("%s.%s", TEST_SUBTASK_INDEX, metricName);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
new file mode 100644
index 0000000..05a1163
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test base class for subclasses of {@link AbstractMetricsHandler}.
+ */
+public abstract class MetricsHandlerTestBase<T extends
+	AbstractMetricsHandler> extends TestLogger {
+
+	private static final String TEST_METRIC_NAME = "test_counter";
+
+	private static final int TEST_METRIC_VALUE = 1000;
+
+	static final CompletableFuture<String> TEST_REST_ADDRESS =
+		CompletableFuture.completedFuture("localhost:12345");
+
+	static final Time TIMEOUT = Time.milliseconds(50);
+
+	static final Map<String, String> TEST_HEADERS = Collections.emptyMap();
+
+	@Mock
+	MetricFetcher mockMetricFetcher;
+
+	GatewayRetriever<DispatcherGateway> leaderRetriever;
+
+	@Mock
+	private DispatcherGateway mockDispatcherGateway;
+
+	private T metricsHandler;
+
+	private Map<String, String> pathParameters;
+
+	@Before
+	public void setUp() {
+		MockitoAnnotations.initMocks(this);
+
+		this.leaderRetriever = new GatewayRetriever<DispatcherGateway>() {
+			@Override
+			public CompletableFuture<DispatcherGateway> getFuture() {
+				return CompletableFuture.completedFuture(mockDispatcherGateway);
+			}
+		};
+		this.pathParameters = getPathParameters();
+		this.metricsHandler = getMetricsHandler();
+
+		final MetricStore metricStore = new MetricStore();
+		metricStore.add(new MetricDump.CounterDump(getQueryScopeInfo(), TEST_METRIC_NAME,
+			TEST_METRIC_VALUE));
+		when(mockMetricFetcher.getMetricStore()).thenReturn(metricStore);
+	}
+
+	/**
+	 * Tests that the metric with name defined under {@link #TEST_METRIC_NAME} can be retrieved
+	 * from the {@link MetricStore.ComponentMetricStore} returned from
+	 * {@link AbstractMetricsHandler#getComponentMetricStore(HandlerRequest, MetricStore)}.
+	 */
+	@Test
+	public void testGetMetric() throws Exception {
+		@SuppressWarnings("unchecked") final CompletableFuture<MetricCollectionResponseBody> completableFuture =
+			metricsHandler.handleRequest(
+				new HandlerRequest<>(
+					EmptyRequestBody.getInstance(),
+					metricsHandler.getMessageHeaders().getUnresolvedMessageParameters(),
+					pathParameters,
+					Collections.emptyMap()),
+				mockDispatcherGateway);
+
+		assertTrue(completableFuture.isDone());
+
+		final MetricCollectionResponseBody metricCollectionResponseBody = completableFuture.get();
+		assertThat(metricCollectionResponseBody.getMetrics(), hasSize(1));
+
+		final Metric metric = metricCollectionResponseBody.getMetrics().iterator().next();
+		assertThat(metric.getId(), equalTo(getExpectedIdForMetricName(TEST_METRIC_NAME)));
+	}
+
+	/**
+	 * Returns instance under test.
+	 */
+	abstract T getMetricsHandler();
+
+	abstract QueryScopeInfo getQueryScopeInfo();
+
+	abstract Map<String, String> getPathParameters();
+
+	/**
+	 * Returns the expected metric id for a given metric name. By default the metric name without
+	 * any modifications is returned.
+	 *
+	 * @param metricName The metric name.
+	 * @return The id of the metric name possibly with additional information, e.g., subtask index
+	 * as a prefix.
+	 *
+	 */
+	String getExpectedIdForMetricName(final String metricName) {
+		return metricName;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java
index bbdbf98..2604bd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java
@@ -57,7 +57,17 @@ public abstract class RestResponseMarshallingTestBase<R extends ResponseBody> ex
 		JsonNode json = objectMapper.valueToTree(expected);
 
 		final R unmarshalled = objectMapper.treeToValue(json, getTestResponseClass());
-		Assert.assertEquals(expected, unmarshalled);
+		assertOriginalEqualsToUnmarshalled(expected, unmarshalled);
+	}
+
+	/**
+	 * Asserts that two objects are equal. If they are not, an {@link AssertionError} is thrown.
+	 *
+	 * @param expected expected value
+	 * @param actual   the value to check against expected
+	 */
+	protected void assertOriginalEqualsToUnmarshalled(R expected, R actual) {
+		Assert.assertEquals(expected, actual);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeadersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeadersTest.java
new file mode 100644
index 0000000..51a6ff8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeadersTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AbstractMetricsHeaders}.
+ */
+public class AbstractMetricsHeadersTest extends TestLogger {
+
+	private AbstractMetricsHeaders<EmptyMessageParameters> metricsHandlerHeaders;
+
+	@Before
+	public void setUp() throws Exception {
+		metricsHandlerHeaders = new AbstractMetricsHeaders<EmptyMessageParameters>() {
+			@Override
+			public EmptyMessageParameters getUnresolvedMessageParameters() {
+				return EmptyMessageParameters.getInstance();
+			}
+
+			@Override
+			public String getTargetRestEndpointURL() {
+				return "/";
+			}
+		};
+	}
+
+	@Test
+	public void testHttpMethod() {
+		assertThat(metricsHandlerHeaders.getHttpMethod(), equalTo(HttpMethodWrapper.GET));
+	}
+
+	@Test
+	public void testResponseStatus() {
+		assertThat(metricsHandlerHeaders.getResponseStatusCode(), equalTo(HttpResponseStatus.OK));
+	}
+
+	@Test
+	public void testRequestClass() {
+		assertThat(metricsHandlerHeaders.getRequestClass(), equalTo(EmptyRequestBody.class));
+	}
+
+	@Test
+	public void testResponseClass() {
+		assertThat(metricsHandlerHeaders.getResponseClass(), equalTo(MetricCollectionResponseBody.class));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBodyTest.java
new file mode 100644
index 0000000..dfb2682
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricCollectionResponseBodyTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.not;
+
+/**
+ * Tests for {@link MetricCollectionResponseBody}.
+ */
+public class MetricCollectionResponseBodyTest extends
+	RestResponseMarshallingTestBase<MetricCollectionResponseBody> {
+
+	private static final String TEST_METRIC_NAME = "metric1";
+
+	private static final String TEST_METRIC_VALUE = "1000";
+
+	@Override
+	protected Class<MetricCollectionResponseBody> getTestResponseClass() {
+		return MetricCollectionResponseBody.class;
+	}
+
+	@Override
+	protected MetricCollectionResponseBody getTestResponseInstance() {
+		return new MetricCollectionResponseBody(Collections.singleton(new Metric(
+			TEST_METRIC_NAME,
+			TEST_METRIC_VALUE)));
+	}
+
+	@Override
+	protected void assertOriginalEqualsToUnmarshalled(
+			MetricCollectionResponseBody expected,
+			MetricCollectionResponseBody actual) {
+
+		assertThat(actual.getMetrics(), hasSize(1));
+
+		final Metric metric = actual.getMetrics().iterator().next();
+		assertThat(metric.getId(), equalTo(TEST_METRIC_NAME));
+		assertThat(metric.getValue(), equalTo(TEST_METRIC_VALUE));
+	}
+
+	@Test
+	public void testNullValueNotSerialized() throws Exception {
+		final String json = RestMapperUtils.getStrictObjectMapper()
+			.writeValueAsString(
+				new MetricCollectionResponseBody(
+					Collections.singleton(new Metric(TEST_METRIC_NAME))));
+
+		assertThat(json, not(containsString("\"value\"")));
+		assertThat(json, not(containsString("\"metrics\"")));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1119c48c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameterTest.java
new file mode 100644
index 0000000..2756a65
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameterTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link MetricsFilterParameter}.
+ */
+public class MetricsFilterParameterTest extends TestLogger {
+
+	private MetricsFilterParameter metricsFilterParameter;
+
+	@Before
+	public void setUp() {
+		metricsFilterParameter = new MetricsFilterParameter();
+	}
+
+	@Test
+	public void testIsOptionalParameter() {
+		assertFalse(metricsFilterParameter.isMandatory());
+	}
+
+	@Test
+	public void testConversions() {
+		assertThat(metricsFilterParameter.convertStringToValue("test"), equalTo("test"));
+		assertThat(metricsFilterParameter.convertValueFromString("test"), equalTo("test"));
+	}
+
+}