You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/16 19:18:42 UTC

[01/11] flink git commit: [hotfix][metrics] Make MessageParameter constructor protected

Repository: flink
Updated Branches:
  refs/heads/release-1.5 c6d45b922 -> a241d2af7


[hotfix][metrics] Make MessageParameter constructor protected


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a562e5d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a562e5d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a562e5d4

Branch: refs/heads/release-1.5
Commit: a562e5d40bd6f7d02d4d39752ad140551c25c854
Parents: 2cc77f9
Author: zentol <ch...@apache.org>
Authored: Mon Mar 26 14:41:03 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 16 21:18:32 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/rest/messages/MessageParameter.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a562e5d4/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
index a615e96..b8485e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
@@ -42,7 +42,7 @@ public abstract class MessageParameter<X> {
 	private final String key;
 	private X value;
 
-	MessageParameter(String key, MessageParameterRequisiteness requisiteness) {
+	protected MessageParameter(String key, MessageParameterRequisiteness requisiteness) {
 		this.key = Preconditions.checkNotNull(key);
 		this.requisiteness = Preconditions.checkNotNull(requisiteness);
 	}


[11/11] flink git commit: [FLINK-9045][REST] Make createLocalEnvironmentWithWebUI more user-friendly logging message for web UI address

Posted by ch...@apache.org.
[FLINK-9045][REST] Make createLocalEnvironmentWithWebUI more user-friendly logging message for web UI address

-add back known logging mesages about webUI address
-do not set random port in local stream environment

This closes #5814.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a241d2af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a241d2af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a241d2af

Branch: refs/heads/release-1.5
Commit: a241d2af7d640407974dfa460f4693d1f75a5ff2
Parents: 39e9e19
Author: zentol <ch...@apache.org>
Authored: Wed Apr 4 10:44:59 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 16 21:18:33 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/api/java/ExecutionEnvironment.java   |  6 ++++++
 .../flink/runtime/webmonitor/WebMonitorEndpoint.java      | 10 +++++++++-
 .../streaming/api/environment/LocalStreamEnvironment.java |  4 +++-
 .../api/environment/StreamExecutionEnvironment.java       |  6 ++++++
 4 files changed, 24 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a241d2af/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 1ce2221..3ea99ea 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -52,6 +52,7 @@ import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
@@ -1125,6 +1126,11 @@ public abstract class ExecutionEnvironment {
 
 		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
+		if (!conf.contains(RestOptions.REST_PORT)) {
+			// explicitly set this option so that it's not set to 0 later
+			conf.setInteger(RestOptions.REST_PORT, RestOptions.REST_PORT.defaultValue());
+		}
+
 		return createLocalEnvironment(conf, -1);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a241d2af/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index fb663ad..0ea7550 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -163,6 +163,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 
 	private final FatalErrorHandler fatalErrorHandler;
 
+	private boolean hasWebUI = false;
+
 	public WebMonitorEndpoint(
 			RestServerEndpointConfiguration endpointConfiguration,
 			GatewayRetriever<? extends T> leaderRetriever,
@@ -606,7 +608,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		handlers.add(Tuple2.of(YarnStopJobTerminationHeaders.getInstance(), jobStopTerminationHandler));
 
 		optWebContent.ifPresent(
-			webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));
+			webContent -> {
+				handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent));
+				hasWebUI = true;
+			});
 
 		// load the log and stdout file handler for the main cluster component
 		final WebMonitorUtils.LogFileLocation logFileLocation = WebMonitorUtils.LogFileLocation.find(clusterConfiguration);
@@ -679,6 +684,9 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 	@Override
 	public void startInternal() throws Exception {
 		leaderElectionService.start(this);
+		if (hasWebUI) {
+			log.info("Web frontend listening at {}.", getRestBaseUrl());
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a241d2af/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 935c78e..b9c76b2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -99,7 +99,9 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 		// add (and override) the settings with what the user defined
 		configuration.addAll(this.configuration);
 
-		configuration.setInteger(RestOptions.REST_PORT, 0);
+		if (!configuration.contains(RestOptions.REST_PORT)) {
+			configuration.setInteger(RestOptions.REST_PORT, 0);
+		}
 
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
 			.setConfiguration(configuration)

http://git-wip-us.apache.org/repos/asf/flink/blob/a241d2af/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index fa81c27..7372fe8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -48,6 +48,7 @@ import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -1679,6 +1680,11 @@ public abstract class StreamExecutionEnvironment {
 
 		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
+		if (!conf.contains(RestOptions.REST_PORT)) {
+			// explicitly set this option so that it's not set to 0 later
+			conf.setInteger(RestOptions.REST_PORT, RestOptions.REST_PORT.defaultValue());
+		}
+
 		return createLocalEnvironment(defaultLocalParallelism, conf);
 	}
 


[08/11] flink git commit: [FLINK-8370][REST] Port AggregatingMetricsHandler to flip6

Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
new file mode 100644
index 0000000..4453ee2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Test base for handlers that extend {@link AbstractAggregatingMetricsHandler}.
+ */
+public abstract class AggregatingMetricsHandlerTestBase<
+	H extends AbstractAggregatingMetricsHandler<P>,
+	P extends AbstractAggregatedMetricsParameters<?>>
+	extends TestLogger {
+
+	private static final CompletableFuture<String> TEST_REST_ADDRESS;
+	private static final DispatcherGateway MOCK_DISPATCHER_GATEWAY;
+	private static final GatewayRetriever<DispatcherGateway> LEADER_RETRIEVER;
+	private static final Time TIMEOUT = Time.milliseconds(50);
+	private static final Map<String, String> TEST_HEADERS = Collections.emptyMap();
+	private static final Executor EXECUTOR = TestingUtils.defaultExecutor();
+
+	static {
+		TEST_REST_ADDRESS = CompletableFuture.completedFuture("localhost:12345");
+
+		MOCK_DISPATCHER_GATEWAY = mock(DispatcherGateway.class);
+
+		LEADER_RETRIEVER = new GatewayRetriever<DispatcherGateway>() {
+			@Override
+			public CompletableFuture<DispatcherGateway> getFuture() {
+				return CompletableFuture.completedFuture(MOCK_DISPATCHER_GATEWAY);
+			}
+		};
+	}
+
+	private H handler;
+	private MetricStore store;
+	private Map<String, String> pathParameters;
+
+	@Before
+	public void setUp() throws Exception {
+		MetricFetcher<RestfulGateway> fetcher = new MetricFetcher<RestfulGateway>(
+			mock(GatewayRetriever.class),
+			mock(MetricQueryServiceRetriever.class),
+			Executors.directExecutor(),
+			TestingUtils.TIMEOUT());
+		store = fetcher.getMetricStore();
+
+		Collection<MetricDump> metricDumps = getMetricDumps();
+		for (MetricDump dump : metricDumps) {
+			store.add(dump);
+		}
+
+		handler = getHandler(
+			TEST_REST_ADDRESS,
+			LEADER_RETRIEVER,
+			TIMEOUT,
+			TEST_HEADERS,
+			EXECUTOR,
+			fetcher);
+		pathParameters = getPathParameters();
+	}
+
+	protected Map<String, String> getPathParameters() {
+		return Collections.emptyMap();
+	}
+
+	protected abstract Tuple2<String, List<String>> getFilter();
+
+	protected abstract Collection<MetricDump> getMetricDumps();
+
+	protected abstract H getHandler(
+		CompletableFuture<String> localRestAddress,
+		GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+		Time timeout,
+		Map<String, String> responseHeaders,
+		Executor executor,
+		MetricFetcher<?> fetcher
+	);
+
+	@Test
+	public void getStores() throws Exception {
+		{ // test without filter
+			HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+				EmptyRequestBody.getInstance(),
+				handler.getMessageHeaders().getUnresolvedMessageParameters(),
+				pathParameters,
+				Collections.emptyMap()
+			);
+			Collection<? extends MetricStore.ComponentMetricStore> subStores = handler.getStores(store, request);
+
+			assertEquals(3, subStores.size());
+
+			List<String> sortedMetrics1 = subStores.stream()
+				.map(subStore -> subStore.getMetric("abc.metric1"))
+				.filter(Objects::nonNull)
+				.sorted()
+				.collect(Collectors.toList());
+
+			assertEquals(2, sortedMetrics1.size());
+
+			assertEquals("1", sortedMetrics1.get(0));
+			assertEquals("3", sortedMetrics1.get(1));
+
+			List<String> sortedMetrics2 = subStores.stream()
+				.map(subStore -> subStore.getMetric("abc.metric2"))
+				.filter(Objects::nonNull)
+				.sorted()
+				.collect(Collectors.toList());
+
+			assertEquals(1, sortedMetrics2.size());
+
+			assertEquals("5", sortedMetrics2.get(0));
+		}
+
+		{ // test with filter
+			Tuple2<String, List<String>> filter = getFilter();
+			Map<String, List<String>> queryParameters = new HashMap<>(4);
+			queryParameters.put(filter.f0, filter.f1);
+			HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+				EmptyRequestBody.getInstance(),
+				handler.getMessageHeaders().getUnresolvedMessageParameters(),
+				pathParameters,
+				queryParameters
+			);
+			Collection<? extends MetricStore.ComponentMetricStore> subStores = handler.getStores(store, request);
+
+			assertEquals(2, subStores.size());
+
+			List<String> sortedMetrics1 = subStores.stream()
+				.map(subStore -> subStore.getMetric("abc.metric1"))
+				.filter(Objects::nonNull)
+				.sorted()
+				.collect(Collectors.toList());
+
+			assertEquals(1, sortedMetrics1.size());
+
+			assertEquals("1", sortedMetrics1.get(0));
+
+			List<String> sortedMetrics2 = subStores.stream()
+				.map(subStore -> subStore.getMetric("abc.metric2"))
+				.filter(Objects::nonNull)
+				.sorted()
+				.collect(Collectors.toList());
+
+			assertEquals(1, sortedMetrics2.size());
+
+			assertEquals("5", sortedMetrics2.get(0));
+		}
+	}
+
+	@Test
+	public void testListMetrics() throws Exception {
+		HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			handler.getMessageHeaders().getUnresolvedMessageParameters(),
+			pathParameters,
+			Collections.emptyMap()
+		);
+
+		AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+			.get();
+
+		List<String> availableMetrics = response.getMetrics().stream()
+			.map(AggregatedMetric::getId)
+			.sorted()
+			.collect(Collectors.toList());
+
+		assertEquals(2, availableMetrics.size());
+		assertEquals("abc.metric1", availableMetrics.get(0));
+		assertEquals("abc.metric2", availableMetrics.get(1));
+	}
+
+	@Test
+	public void testMinAggregation() throws Exception {
+		Map<String, List<String>> queryParams = new HashMap<>(4);
+		queryParams.put("get", Collections.singletonList("abc.metric1"));
+		queryParams.put("agg", Collections.singletonList("min"));
+
+		HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			handler.getMessageHeaders().getUnresolvedMessageParameters(),
+			pathParameters,
+			queryParams
+		);
+
+		AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+			.get();
+
+		Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+		assertEquals(1, aggregatedMetrics.size());
+		AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+		assertEquals("abc.metric1", aggregatedMetric.getId());
+		assertEquals(1.0, aggregatedMetric.getMin(), 0.1);
+		assertNull(aggregatedMetric.getMax());
+		assertNull(aggregatedMetric.getSum());
+		assertNull(aggregatedMetric.getAvg());
+	}
+
+	@Test
+	public void testMaxAggregation() throws Exception {
+		Map<String, List<String>> queryParams = new HashMap<>(4);
+		queryParams.put("get", Collections.singletonList("abc.metric1"));
+		queryParams.put("agg", Collections.singletonList("max"));
+
+		HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			handler.getMessageHeaders().getUnresolvedMessageParameters(),
+			pathParameters,
+			queryParams
+		);
+
+		AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+			.get();
+
+		Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+		assertEquals(1, aggregatedMetrics.size());
+		AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+		assertEquals("abc.metric1", aggregatedMetric.getId());
+		assertEquals(3.0, aggregatedMetric.getMax(), 0.1);
+		assertNull(aggregatedMetric.getMin());
+		assertNull(aggregatedMetric.getSum());
+		assertNull(aggregatedMetric.getAvg());
+	}
+
+	@Test
+	public void testSumAggregation() throws Exception {
+		Map<String, List<String>> queryParams = new HashMap<>(4);
+		queryParams.put("get", Collections.singletonList("abc.metric1"));
+		queryParams.put("agg", Collections.singletonList("sum"));
+
+		HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			handler.getMessageHeaders().getUnresolvedMessageParameters(),
+			pathParameters,
+			queryParams
+		);
+
+		AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+			.get();
+
+		Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+		assertEquals(1, aggregatedMetrics.size());
+		AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+		assertEquals("abc.metric1", aggregatedMetric.getId());
+		assertEquals(4.0, aggregatedMetric.getSum(), 0.1);
+		assertNull(aggregatedMetric.getMin());
+		assertNull(aggregatedMetric.getMax());
+		assertNull(aggregatedMetric.getAvg());
+	}
+
+	@Test
+	public void testAvgAggregation() throws Exception {
+		Map<String, List<String>> queryParams = new HashMap<>(4);
+		queryParams.put("get", Collections.singletonList("abc.metric1"));
+		queryParams.put("agg", Collections.singletonList("avg"));
+
+		HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			handler.getMessageHeaders().getUnresolvedMessageParameters(),
+			pathParameters,
+			queryParams
+		);
+
+		AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+			.get();
+
+		Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+		assertEquals(1, aggregatedMetrics.size());
+		AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+		assertEquals("abc.metric1", aggregatedMetric.getId());
+		assertEquals(2.0, aggregatedMetric.getAvg(), 0.1);
+		assertNull(aggregatedMetric.getMin());
+		assertNull(aggregatedMetric.getMax());
+		assertNull(aggregatedMetric.getSum());
+	}
+
+	@Test
+	public void testMultipleAggregation() throws Exception {
+		Map<String, List<String>> queryParams = new HashMap<>(4);
+		queryParams.put("get", Collections.singletonList("abc.metric1"));
+		queryParams.put("agg", Arrays.asList("min", "max", "avg"));
+
+		HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			handler.getMessageHeaders().getUnresolvedMessageParameters(),
+			pathParameters,
+			queryParams
+		);
+
+		AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+			.get();
+
+		Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+		assertEquals(1, aggregatedMetrics.size());
+		AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+		assertEquals("abc.metric1", aggregatedMetric.getId());
+		assertEquals(1.0, aggregatedMetric.getMin(), 0.1);
+		assertEquals(3.0, aggregatedMetric.getMax(), 0.1);
+		assertEquals(2.0, aggregatedMetric.getAvg(), 0.1);
+		assertNull(aggregatedMetric.getSum());
+	}
+
+	@Test
+	public void testDefaultAggregation() throws Exception {
+		Map<String, List<String>> queryParams = new HashMap<>(4);
+		queryParams.put("get", Collections.singletonList("abc.metric1"));
+
+		HandlerRequest<EmptyRequestBody, P> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			handler.getMessageHeaders().getUnresolvedMessageParameters(),
+			pathParameters,
+			queryParams
+		);
+
+		AggregatedMetricsResponseBody response = handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY)
+			.get();
+
+		Collection<AggregatedMetric> aggregatedMetrics = response.getMetrics();
+
+		assertEquals(1, aggregatedMetrics.size());
+		AggregatedMetric aggregatedMetric = aggregatedMetrics.iterator().next();
+
+		assertEquals("abc.metric1", aggregatedMetric.getId());
+		assertEquals(1.0, aggregatedMetric.getMin(), 0.1);
+		assertEquals(3.0, aggregatedMetric.getMax(), 0.1);
+		assertEquals(2.0, aggregatedMetric.getAvg(), 0.1);
+		assertEquals(4.0, aggregatedMetric.getSum(), 0.1);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
new file mode 100644
index 0000000..902570c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests for the {@link AggregatingSubtasksMetricsHandler}.
+ */
+public class AggregatingSubtasksMetricsHandlerTest extends AggregatingMetricsHandlerTestBase<AggregatingSubtasksMetricsHandler, AggregatedSubtaskMetricsParameters> {
+
+	private static final JobID JOB_ID = JobID.generate();
+	private static final JobVertexID TASK_ID = new JobVertexID();
+
+	@Override
+	protected Tuple2<String, List<String>> getFilter() {
+		return Tuple2.of("subtasks", Arrays.asList("1", "3"));
+	}
+
+	@Override
+	protected Map<String, String> getPathParameters() {
+		Map<String, String> pathParameters = new HashMap<>(4);
+		pathParameters.put(JobIDPathParameter.KEY, JOB_ID.toString());
+		pathParameters.put(JobVertexIdPathParameter.KEY, TASK_ID.toString());
+		return pathParameters;
+	}
+
+	@Override
+	protected Collection<MetricDump> getMetricDumps() {
+		Collection<MetricDump> dumps = new ArrayList<>(3);
+		QueryScopeInfo.TaskQueryScopeInfo task1 = new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(), TASK_ID.toString(), 1, "abc");
+		MetricDump.CounterDump cd1 = new MetricDump.CounterDump(task1, "metric1", 1);
+		dumps.add(cd1);
+
+		QueryScopeInfo.TaskQueryScopeInfo task2 = new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(), TASK_ID.toString(), 2, "abc");
+		MetricDump.CounterDump cd2 = new MetricDump.CounterDump(task2, "metric1", 3);
+		dumps.add(cd2);
+
+		QueryScopeInfo.TaskQueryScopeInfo task3 = new QueryScopeInfo.TaskQueryScopeInfo(JOB_ID.toString(), TASK_ID.toString(), 3, "abc");
+		MetricDump.CounterDump cd3 = new MetricDump.CounterDump(task3, "metric2", 5);
+		dumps.add(cd3);
+
+		return dumps;
+	}
+
+	@Override
+	protected AggregatingSubtasksMetricsHandler getHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+		return new AggregatingSubtasksMetricsHandler(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			executor,
+			fetcher
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
new file mode 100644
index 0000000..fb7a51b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandlerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregateTaskManagerMetricsParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests for the {@link AggregatingTaskManagersMetricsHandler}.
+ */
+public class AggregatingTaskManagersMetricsHandlerTest extends AggregatingMetricsHandlerTestBase<AggregatingTaskManagersMetricsHandler, AggregateTaskManagerMetricsParameters> {
+
+	private static final ResourceID TM_ID_1 = ResourceID.generate();
+	private static final ResourceID TM_ID_2 = ResourceID.generate();
+	private static final ResourceID TM_ID_3 = ResourceID.generate();
+
+	@Override
+	protected Tuple2<String, List<String>> getFilter() {
+		return Tuple2.of("taskmanagers", Arrays.asList(TM_ID_1.toString(), TM_ID_3.toString()));
+	}
+
+	@Override
+	protected Collection<MetricDump> getMetricDumps() {
+		Collection<MetricDump> dumps = new ArrayList<>(3);
+		QueryScopeInfo.TaskManagerQueryScopeInfo tm1 = new QueryScopeInfo.TaskManagerQueryScopeInfo(TM_ID_1.toString(), "abc");
+		MetricDump.CounterDump cd1 = new MetricDump.CounterDump(tm1, "metric1", 1);
+		dumps.add(cd1);
+
+		QueryScopeInfo.TaskManagerQueryScopeInfo tm2 = new QueryScopeInfo.TaskManagerQueryScopeInfo(TM_ID_2.toString(), "abc");
+		MetricDump.CounterDump cd2 = new MetricDump.CounterDump(tm2, "metric1", 3);
+		dumps.add(cd2);
+
+		QueryScopeInfo.TaskManagerQueryScopeInfo tm3 = new QueryScopeInfo.TaskManagerQueryScopeInfo(TM_ID_3.toString(), "abc");
+		MetricDump.CounterDump cd3 = new MetricDump.CounterDump(tm3, "metric2", 5);
+		dumps.add(cd3);
+
+		return dumps;
+	}
+
+	@Override
+	protected AggregatingTaskManagersMetricsHandler getHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+		return new AggregatingTaskManagersMetricsHandler(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			executor,
+			fetcher
+		);
+	}
+}


[03/11] flink git commit: [FLINK-8961][tests] Add MiniClusterResource#getClientConfiguration

Posted by ch...@apache.org.
[FLINK-8961][tests] Add MiniClusterResource#getClientConfiguration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5423d0e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5423d0e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5423d0e2

Branch: refs/heads/release-1.5
Commit: 5423d0e2ebed8413b1200424689fab9cae5bfef5
Parents: a0a24b2
Author: zentol <ch...@apache.org>
Authored: Thu Apr 5 11:00:45 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 16 21:18:32 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/test/util/MiniClusterResource.java  | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5423d0e2/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 8a05750..531a3c7 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -25,8 +25,10 @@ import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -67,6 +69,8 @@ public class MiniClusterResource extends ExternalResource {
 
 	private ClusterClient<?> clusterClient;
 
+	private Configuration restClusterClientConfig;
+
 	private int numberSlots = -1;
 
 	private TestEnvironment executionEnvironment;
@@ -117,6 +121,10 @@ public class MiniClusterResource extends ExternalResource {
 		return clusterClient;
 	}
 
+	public Configuration getClientConfiguration() {
+		return restClusterClientConfig;
+	}
+
 	public TestEnvironment getTestEnvironment() {
 		return executionEnvironment;
 	}
@@ -194,6 +202,9 @@ public class MiniClusterResource extends ExternalResource {
 		if (enableClusterClient) {
 			clusterClient = new StandaloneClusterClient(configuration, flinkMiniCluster.highAvailabilityServices(), true);
 		}
+		Configuration restClientConfig = new Configuration();
+		restClientConfig.setInteger(JobManagerOptions.PORT, flinkMiniCluster.getLeaderRPCPort());
+		this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
 	}
 
 	private void startMiniCluster() throws Exception {
@@ -229,6 +240,10 @@ public class MiniClusterResource extends ExternalResource {
 		if (enableClusterClient) {
 			clusterClient = new MiniClusterClient(configuration, miniCluster);
 		}
+		Configuration restClientConfig = new Configuration();
+		restClientConfig.setString(JobManagerOptions.ADDRESS, miniCluster.getRestAddress().getHost());
+		restClientConfig.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort());
+		this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
 	}
 
 	/**


[02/11] flink git commit: [FLINK-8961][tests] Port JobRetrievalITCase to flip6

Posted by ch...@apache.org.
[FLINK-8961][tests] Port JobRetrievalITCase to flip6

This closes #5730.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cc77f9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cc77f9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cc77f9f

Branch: refs/heads/release-1.5
Commit: 2cc77f9f6e999238ae9dd7d24712e5d7a397f4cb
Parents: 5423d0e
Author: zentol <ch...@apache.org>
Authored: Tue Mar 20 15:19:47 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 16 21:18:32 2018 +0200

----------------------------------------------------------------------
 .../test/example/client/JobRetrievalITCase.java | 121 +++++++-------
 .../client/LegacyJobRetrievalITCase.java        | 162 +++++++++++++++++++
 2 files changed, 224 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2cc77f9f/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 57198c0..6b747e0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -21,30 +21,27 @@ package org.apache.flink.test.example.client;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobRetrievalException;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.testutils.category.New;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
+import java.util.Optional;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.collection.Seq;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -52,23 +49,41 @@ import static org.junit.Assert.fail;
 /**
  * Tests retrieval of a job from a running Flink cluster.
  */
+@Category(New.class)
 public class JobRetrievalITCase extends TestLogger {
 
 	private static final Semaphore lock = new Semaphore(1);
 
-	private static FlinkMiniCluster cluster;
-
-	@BeforeClass
-	public static void before() {
-		Configuration configuration = new Configuration();
-		cluster = new TestingCluster(configuration, false);
-		cluster.start();
+	@ClassRule
+	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			new Configuration(),
+			1,
+			4
+		),
+		MiniClusterResource.MiniClusterType.NEW
+	);
+
+	private RestClusterClient<StandaloneClusterId> client;
+
+	@Before
+	public void setUp() throws Exception {
+		final Configuration clientConfig = new Configuration();
+		clientConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);
+		clientConfig.setLong(RestOptions.RETRY_DELAY, 0);
+		clientConfig.addAll(CLUSTER.getClientConfiguration());
+
+		client = new RestClusterClient<>(
+			clientConfig,
+			StandaloneClusterId.getInstance()
+		);
 	}
 
-	@AfterClass
-	public static void after() {
-		cluster.stop();
-		cluster = null;
+	@After
+	public void tearDown() {
+		if (client != null) {
+			client.shutdown();
+		}
 	}
 
 	@Test
@@ -80,64 +95,52 @@ public class JobRetrievalITCase extends TestLogger {
 
 		final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock);
 
-		final ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration(), cluster.highAvailabilityServices(), true);
-
 		// acquire the lock to make sure that the job cannot complete until the job client
 		// has been attached in resumingThread
 		lock.acquire();
-		client.runDetached(jobGraph, JobRetrievalITCase.class.getClassLoader());
-		final AtomicReference<Throwable> error = new AtomicReference<>();
 
-		final Thread resumingThread = new Thread(new Runnable() {
+		client.setDetached(true);
+		client.submitJob(jobGraph, JobRetrievalITCase.class.getClassLoader());
+
+		final CheckedThread resumingThread = new CheckedThread("Flink-Job-Retriever") {
 			@Override
-			public void run() {
-				try {
-					assertNotNull(client.retrieveJob(jobID));
-				} catch (Throwable e) {
-					error.set(e);
-				}
+			public void go() throws Exception {
+				assertNotNull(client.requestJobResult(jobID).get());
 			}
-		}, "Flink-Job-Retriever");
-
-		final Seq<ActorSystem> actorSystemSeq = cluster.jobManagerActorSystems().get();
-		final ActorSystem actorSystem = actorSystemSeq.last();
-		JavaTestKit testkit = new JavaTestKit(actorSystem);
+		};
 
-		final ActorRef jm = cluster.getJobManagersAsJava().get(0);
-		// wait until client connects
-		jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), testkit.getRef());
-		// confirm registration
-		testkit.expectMsgEquals(true);
+		// wait until the job is running
+		while (client.listJobs().get().isEmpty()) {
+			Thread.sleep(50);
+		}
 
 		// kick off resuming
 		resumingThread.start();
 
 		// wait for client to connect
-		testkit.expectMsgAllOf(
-			TestingJobManagerMessages.getClientConnected(),
-			TestingJobManagerMessages.getClassLoadingPropsDelivered());
+		while (resumingThread.getState() != Thread.State.WAITING) {
+			Thread.sleep(10);
+		}
 
 		// client has connected, we can release the lock
 		lock.release();
 
-		resumingThread.join();
-
-		Throwable exception = error.get();
-		if (exception != null) {
-			throw new AssertionError(exception);
-		}
+		resumingThread.sync();
 	}
 
 	@Test
 	public void testNonExistingJobRetrieval() throws Exception {
 		final JobID jobID = new JobID();
-		ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration());
 
 		try {
-			client.retrieveJob(jobID);
+			client.requestJobResult(jobID).get();
 			fail();
-		} catch (JobRetrievalException ignored) {
-			// this is what we want
+		} catch (Exception exception) {
+			Optional<Throwable> expectedCause = ExceptionUtils.findThrowable(exception,
+				candidate -> candidate.getMessage() != null && candidate.getMessage().contains("Could not find Flink job"));
+			if (!expectedCause.isPresent()) {
+				throw exception;
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cc77f9f/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java
new file mode 100644
index 0000000..174c90e
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.example.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobRetrievalException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.collection.Seq;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests retrieval of a job from a running Flink cluster.
+ */
+public class LegacyJobRetrievalITCase extends TestLogger {
+
+	private static final Semaphore lock = new Semaphore(1);
+
+	private static FlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void before() {
+		Configuration configuration = new Configuration();
+		cluster = new TestingCluster(configuration, false);
+		cluster.start();
+	}
+
+	@AfterClass
+	public static void after() {
+		cluster.stop();
+		cluster = null;
+	}
+
+	@Test
+	public void testJobRetrieval() throws Exception {
+		final JobID jobID = new JobID();
+
+		final JobVertex imalock = new JobVertex("imalock");
+		imalock.setInvokableClass(SemaphoreInvokable.class);
+
+		final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock);
+
+		final ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration(), cluster.highAvailabilityServices(), true);
+
+		// acquire the lock to make sure that the job cannot complete until the job client
+		// has been attached in resumingThread
+		lock.acquire();
+		client.runDetached(jobGraph, LegacyJobRetrievalITCase.class.getClassLoader());
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final Thread resumingThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					assertNotNull(client.retrieveJob(jobID));
+				} catch (Throwable e) {
+					error.set(e);
+				}
+			}
+		}, "Flink-Job-Retriever");
+
+		final Seq<ActorSystem> actorSystemSeq = cluster.jobManagerActorSystems().get();
+		final ActorSystem actorSystem = actorSystemSeq.last();
+		JavaTestKit testkit = new JavaTestKit(actorSystem);
+
+		final ActorRef jm = cluster.getJobManagersAsJava().get(0);
+		// wait until client connects
+		jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), testkit.getRef());
+		// confirm registration
+		testkit.expectMsgEquals(true);
+
+		// kick off resuming
+		resumingThread.start();
+
+		// wait for client to connect
+		testkit.expectMsgAllOf(
+			TestingJobManagerMessages.getClientConnected(),
+			TestingJobManagerMessages.getClassLoadingPropsDelivered());
+
+		// client has connected, we can release the lock
+		lock.release();
+
+		resumingThread.join();
+
+		Throwable exception = error.get();
+		if (exception != null) {
+			throw new AssertionError(exception);
+		}
+	}
+
+	@Test
+	public void testNonExistingJobRetrieval() throws Exception {
+		final JobID jobID = new JobID();
+		ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration());
+
+		try {
+			client.retrieveJob(jobID);
+			fail();
+		} catch (JobRetrievalException ignored) {
+			// this is what we want
+		}
+	}
+
+	/**
+	 * Invokable that waits on {@link #lock} to be released and finishes afterwards.
+	 *
+	 * <p>NOTE: needs to be <tt>public</tt> so that a task can be run with this!
+	 */
+	public static class SemaphoreInvokable extends AbstractInvokable {
+
+		public SemaphoreInvokable(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			lock.acquire();
+			lock.release();
+		}
+	}
+
+}


[04/11] flink git commit: [hotfix][tests] Add MCR constructor accepting configuration and type

Posted by ch...@apache.org.
[hotfix][tests] Add MCR constructor accepting configuration and type


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0a24b27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0a24b27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0a24b27

Branch: refs/heads/release-1.5
Commit: a0a24b277882382d3c0712ec8fea7c5166a86f9a
Parents: 47909f4
Author: zentol <ch...@apache.org>
Authored: Thu Apr 5 11:00:23 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 16 21:18:32 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/test/util/MiniClusterResource.java   | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a0a24b27/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 9b0ac77..8a05750 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -77,6 +77,12 @@ public class MiniClusterResource extends ExternalResource {
 
 	public MiniClusterResource(
 			final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
+			final MiniClusterType miniClusterType) {
+		this(miniClusterResourceConfiguration, miniClusterType, false);
+	}
+
+	public MiniClusterResource(
+			final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
 			final boolean enableClusterClient) {
 		this(
 			miniClusterResourceConfiguration,


[10/11] flink git commit: [hotfix][metrics] Allow QueryParameter converters to throw ConversionExceptions

Posted by ch...@apache.org.
[hotfix][metrics] Allow QueryParameter converters to throw ConversionExceptions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f579f745
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f579f745
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f579f745

Branch: refs/heads/release-1.5
Commit: f579f745dc3868c1807a814695624d98b53f6d50
Parents: a562e5d
Author: zentol <ch...@apache.org>
Authored: Mon Mar 26 14:41:25 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 16 21:18:33 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/rest/messages/MessageQueryParameter.java       | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f579f745/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
index 180f011..6799df1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
@@ -35,7 +35,7 @@ public abstract class MessageQueryParameter<X> extends MessageParameter<List<X>>
 	}
 
 	@Override
-	public List<X> convertFromString(String values) {
+	public List<X> convertFromString(String values) throws ConversionException {
 		String[] splitValues = values.split(",");
 		List<X> list = new ArrayList<>();
 		for (String value : splitValues) {
@@ -50,7 +50,7 @@ public abstract class MessageQueryParameter<X> extends MessageParameter<List<X>>
 	 * @param value string representation of parameter value
 	 * @return parameter value
 	 */
-	public abstract X convertStringToValue(String value);
+	public abstract X convertStringToValue(String value) throws ConversionException;
 
 	@Override
 	public String convertToString(List<X> values) {


[09/11] flink git commit: [FLINK-8370][REST] Port AggregatingMetricsHandler to flip6

Posted by ch...@apache.org.
[FLINK-8370][REST] Port AggregatingMetricsHandler to flip6

This closes #5805.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23d45436
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23d45436
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23d45436

Branch: refs/heads/release-1.5
Commit: 23d454364d208d3ce8a55422edaaca365a1c9c79
Parents: f579f74
Author: zentol <ch...@apache.org>
Authored: Wed Mar 28 12:52:07 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 16 21:18:33 2018 +0200

----------------------------------------------------------------------
 .../AbstractAggregatingMetricsHandler.java      | 300 ++++++++++++++
 .../metrics/AggregatingJobsMetricsHandler.java  |  77 ++++
 .../AggregatingSubtasksMetricsHandler.java      | 119 ++++++
 .../AggregatingTaskManagersMetricsHandler.java  |  77 ++++
 .../handler/job/metrics/DoubleAccumulator.java  | 257 ++++++++++++
 .../AbstractAggregatedMetricsHeaders.java       |  50 +++
 .../AbstractAggregatedMetricsParameters.java    |  48 +++
 .../AggregateTaskManagerMetricsParameters.java  |  38 ++
 .../metrics/AggregatedJobMetricsHeaders.java    |  44 +++
 .../metrics/AggregatedJobMetricsParameters.java |  39 ++
 .../messages/job/metrics/AggregatedMetric.java  | 118 ++++++
 .../metrics/AggregatedMetricsResponseBody.java  | 112 ++++++
 .../AggregatedSubtaskMetricsHeaders.java        |  47 +++
 .../AggregatedSubtaskMetricsParameters.java     |  51 +++
 .../AggregatedTaskManagerMetricsHeaders.java    |  44 +++
 .../job/metrics/JobsFilterQueryParameter.java   |  48 +++
 .../metrics/MetricsAggregationParameter.java    |  58 +++
 .../metrics/SubtasksFilterQueryParameter.java   |  41 ++
 .../TaskManagersFilterQueryParameter.java       |  42 ++
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  33 ++
 .../AggregatingJobsMetricsHandlerTest.java      |  81 ++++
 .../AggregatingMetricsHandlerTestBase.java      | 389 +++++++++++++++++++
 .../AggregatingSubtasksMetricsHandlerTest.java  |  93 +++++
 ...gregatingTaskManagersMetricsHandlerTest.java |  82 ++++
 24 files changed, 2288 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
new file mode 100644
index 0000000..338bb46
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ *
+ * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned.
+ * {@code /metrics?get=X,Y&agg=min,max}
+ * The handler will then return a list of objects containing the aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]}
+ */
+public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggregatedMetricsParameters<?>> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AggregatedMetricsResponseBody, P> {
+
+	private final Executor executor;
+	private final MetricFetcher<?> fetcher;
+
+	protected AbstractAggregatingMetricsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			AbstractAggregatedMetricsHeaders<P> messageHeaders,
+			Executor executor,
+			MetricFetcher<?> fetcher) {
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+		this.executor = Preconditions.checkNotNull(executor);
+		this.fetcher = Preconditions.checkNotNull(fetcher);
+	}
+
+	@Nonnull
+	abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, P> request);
+
+	@Override
+	protected CompletableFuture<AggregatedMetricsResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, P> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					fetcher.update();
+					List<String> requestedMetrics = request.getQueryParameter(MetricsFilterParameter.class);
+					List<MetricsAggregationParameter.AggregationMode> requestedAggregations = request.getQueryParameter(MetricsAggregationParameter.class);
+					MetricStore store = fetcher.getMetricStore();
+
+					Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, request);
+
+					if (requestedMetrics.isEmpty()) {
+						Collection<String> list = getAvailableMetrics(stores);
+						return new AggregatedMetricsResponseBody(
+							list.stream()
+								.map(AggregatedMetric::new)
+								.collect(Collectors.toList())
+						);
+					}
+
+					DoubleAccumulator.DoubleMinimumFactory minimumFactory = null;
+					DoubleAccumulator.DoubleMaximumFactory maximumFactory = null;
+					DoubleAccumulator.DoubleAverageFactory averageFactory = null;
+					DoubleAccumulator.DoubleSumFactory sumFactory = null;
+					// by default we return all aggregations
+					if (requestedAggregations.isEmpty()) {
+						minimumFactory = DoubleAccumulator.DoubleMinimumFactory.get();
+						maximumFactory = DoubleAccumulator.DoubleMaximumFactory.get();
+						averageFactory = DoubleAccumulator.DoubleAverageFactory.get();
+						sumFactory = DoubleAccumulator.DoubleSumFactory.get();
+					} else {
+						for (MetricsAggregationParameter.AggregationMode aggregation : requestedAggregations) {
+							switch (aggregation) {
+								case MIN:
+									minimumFactory = DoubleAccumulator.DoubleMinimumFactory.get();
+									break;
+								case MAX:
+									maximumFactory = DoubleAccumulator.DoubleMaximumFactory.get();
+									break;
+								case AVG:
+									averageFactory = DoubleAccumulator.DoubleAverageFactory.get();
+									break;
+								case SUM:
+									sumFactory = DoubleAccumulator.DoubleSumFactory.get();
+									break;
+								default:
+									log.warn("Unsupported aggregation specified: {}", aggregation);
+							}
+						}
+					}
+					MetricAccumulatorFactory metricAccumulatorFactory = new MetricAccumulatorFactory(minimumFactory, maximumFactory, averageFactory, sumFactory);
+
+					return getAggregatedMetricValues(stores, requestedMetrics, metricAccumulatorFactory);
+				} catch (Exception e) {
+					log.warn("Could not retrieve metrics.", e);
+					throw new CompletionException(new RestHandlerException("Could not retrieve metrics.", HttpResponseStatus.INTERNAL_SERVER_ERROR));
+				}
+			},
+			executor);
+	}
+
+	/**
+	 * Returns a JSON string containing a list of all available metrics in the given stores. Effectively this method maps
+	 * the union of all key-sets to JSON.
+	 *
+	 * @param stores metrics
+	 * @return JSON string containing a list of all available metrics
+	 */
+	private static Collection<String> getAvailableMetrics(Collection<? extends MetricStore.ComponentMetricStore> stores) {
+		Set<String> uniqueMetrics = new HashSet<>(32);
+		for (MetricStore.ComponentMetricStore store : stores) {
+			uniqueMetrics.addAll(store.metrics.keySet());
+		}
+		return uniqueMetrics;
+	}
+
+	/**
+	 * Extracts and aggregates all requested metrics from the given metric stores, and maps the result to a JSON string.
+	 *
+	 * @param stores available metrics
+	 * @param requestedMetrics ids of requested metrics
+	 * @param requestedAggregationsFactories requested aggregations
+	 * @return JSON string containing the requested metrics
+	 */
+	private AggregatedMetricsResponseBody getAggregatedMetricValues(
+			Collection<? extends MetricStore.ComponentMetricStore> stores,
+			List<String> requestedMetrics,
+			MetricAccumulatorFactory requestedAggregationsFactories) {
+
+		Collection<AggregatedMetric> aggregatedMetrics = new ArrayList<>(requestedMetrics.size());
+		for (String requestedMetric : requestedMetrics) {
+			final Collection<Double> values = new ArrayList<>(stores.size());
+			try {
+				for (MetricStore.ComponentMetricStore store : stores) {
+					String stringValue = store.metrics.get(requestedMetric);
+					if (stringValue != null) {
+						values.add(Double.valueOf(stringValue));
+					}
+				}
+			} catch (NumberFormatException nfe) {
+				log.warn("The metric {} is not numeric and can't be aggregated.", requestedMetric, nfe);
+				// metric is not numeric so we can't perform aggregations => ignore it
+				continue;
+			}
+			if (!values.isEmpty()) {
+
+				Iterator<Double> valuesIterator = values.iterator();
+				MetricAccumulator acc = requestedAggregationsFactories.get(requestedMetric, valuesIterator.next());
+				valuesIterator.forEachRemaining(acc::add);
+
+				aggregatedMetrics.add(acc.get());
+			} else {
+				return new AggregatedMetricsResponseBody(Collections.emptyList());
+			}
+		}
+		return new AggregatedMetricsResponseBody(aggregatedMetrics);
+	}
+
+	private static class MetricAccumulatorFactory {
+
+		@Nullable
+		private final DoubleAccumulator.DoubleMinimumFactory minimumFactory;
+
+		@Nullable
+		private final DoubleAccumulator.DoubleMaximumFactory maximumFactory;
+
+		@Nullable
+		private final DoubleAccumulator.DoubleAverageFactory averageFactory;
+
+		@Nullable
+		private final DoubleAccumulator.DoubleSumFactory sumFactory;
+
+		private MetricAccumulatorFactory(
+				@Nullable DoubleAccumulator.DoubleMinimumFactory minimumFactory,
+				@Nullable DoubleAccumulator.DoubleMaximumFactory maximumFactory,
+				@Nullable DoubleAccumulator.DoubleAverageFactory averageFactory,
+				@Nullable DoubleAccumulator.DoubleSumFactory sumFactory) {
+			this.minimumFactory = minimumFactory;
+			this.maximumFactory = maximumFactory;
+			this.averageFactory = averageFactory;
+			this.sumFactory = sumFactory;
+		}
+
+		MetricAccumulator get(String metricName, double init) {
+			return new MetricAccumulator(
+				metricName,
+				minimumFactory == null ? null : minimumFactory.get(init),
+				maximumFactory == null ? null : maximumFactory.get(init),
+				averageFactory == null ? null : averageFactory.get(init),
+				sumFactory == null ? null : sumFactory.get(init)
+			);
+		}
+	}
+
+	private static class MetricAccumulator {
+		private final String metricName;
+
+		@Nullable
+		private final DoubleAccumulator min;
+		@Nullable
+		private final DoubleAccumulator max;
+		@Nullable
+		private final DoubleAccumulator avg;
+		@Nullable
+		private final DoubleAccumulator sum;
+
+		private MetricAccumulator(
+				String metricName,
+				@Nullable DoubleAccumulator min,
+				@Nullable DoubleAccumulator max,
+				@Nullable DoubleAccumulator avg,
+				@Nullable DoubleAccumulator sum) {
+			this.metricName = Preconditions.checkNotNull(metricName);
+			this.min = min;
+			this.max = max;
+			this.avg = avg;
+			this.sum = sum;
+		}
+
+		void add(double value) {
+			if (min != null) {
+				min.add(value);
+			}
+			if (max != null) {
+				max.add(value);
+			}
+			if (avg != null) {
+				avg.add(value);
+			}
+			if (sum != null) {
+				sum.add(value);
+			}
+		}
+
+		AggregatedMetric get() {
+			return new AggregatedMetric(
+				metricName,
+				min == null ? null : min.getValue(),
+				max == null ? null : max.getValue(),
+				avg == null ? null : avg.getValue(),
+				sum == null ? null : sum.getValue()
+			);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
new file mode 100644
index 0000000..42928a4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedJobMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedJobMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobsFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across jobs, a list of all available metrics or the values
+ * for a set of metrics.
+ *
+ * <p>Specific jobs can be selected for aggregation by specifying a comma-separated list of job IDs.
+ * {@code /metrics?get=X,Y&jobs=A,B}
+ */
+public class AggregatingJobsMetricsHandler extends AbstractAggregatingMetricsHandler<AggregatedJobMetricsParameters> {
+
+	public AggregatingJobsMetricsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout, Map<String, String> responseHeaders,
+			Executor executor,
+			MetricFetcher<?> fetcher) {
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedJobMetricsHeaders.getInstance(), executor, fetcher);
+	}
+
+	@Nonnull
+	@Override
+	Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregatedJobMetricsParameters> request) {
+		List<JobID> jobs = request.getQueryParameter(JobsFilterQueryParameter.class);
+		if (jobs.isEmpty()) {
+			return store.getJobs().values();
+		} else {
+			Collection<MetricStore.ComponentMetricStore> jobStores = new ArrayList<>(jobs.size());
+			for (JobID job : jobs) {
+				MetricStore.ComponentMetricStore jobMetricStore = store.getJobMetricStore(job.toString());
+				if (jobMetricStore != null) {
+					jobStores.add(jobMetricStore);
+				}
+			}
+			return jobStores;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
new file mode 100644
index 0000000..f95deaa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.IntStream;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values
+ * for a set of metrics.
+ *
+ * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y&subtasks=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler<AggregatedSubtaskMetricsParameters> {
+
+	public AggregatingSubtasksMetricsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			Executor executor,
+			MetricFetcher<?> fetcher) {
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher);
+	}
+
+	@Nonnull
+	@Override
+	Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregatedSubtaskMetricsParameters> request) {
+		JobID jobID = request.getPathParameter(JobIDPathParameter.class);
+		JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class);
+
+		Collection<String> subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class);
+		if (subtaskRanges.isEmpty()) {
+			MetricStore.TaskMetricStore taskMetricStore = store.getTaskMetricStore(jobID.toString(), taskID.toString());
+			if (taskMetricStore != null) {
+				return taskMetricStore.getAllSubtaskMetricStores();
+			} else {
+				return Collections.emptyList();
+			}
+		} else {
+			Iterable<Integer> subtasks = getIntegerRangeFromString(subtaskRanges);
+			Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>(8);
+			for (int subtask : subtasks) {
+				MetricStore.ComponentMetricStore subtaskMetricStore = store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask);
+				if (subtaskMetricStore != null) {
+					subtaskStores.add(subtaskMetricStore);
+				}
+			}
+			return subtaskStores;
+		}
+	}
+
+	private Iterable<Integer> getIntegerRangeFromString(Collection<String> ranges) {
+		UnionIterator<Integer> iterators = new UnionIterator<>();
+
+		for (String rawRange : ranges) {
+			try {
+				Iterator<Integer> rangeIterator;
+				String range = rawRange.trim();
+				int dashIdx = range.indexOf('-');
+				if (dashIdx == -1) {
+					// only one value in range:
+					rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
+				} else {
+					// evaluate range
+					final int start = Integer.valueOf(range.substring(0, dashIdx));
+					final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length()));
+					rangeIterator = IntStream.rangeClosed(start, end).iterator();
+				}
+				iterators.add(rangeIterator);
+			} catch (NumberFormatException nfe) {
+				log.warn("Invalid value {} specified for integer range. Not a number.", rawRange, nfe);
+			}
+		}
+
+		return iterators;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
new file mode 100644
index 0000000..2e15cac
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregateTaskManagerMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedTaskManagerMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagersFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across task managers, a list of all available metrics or the values for
+ * a set of metrics.
+ *
+ * <p>Specific taskmanagers can be selected for aggregation by specifying a comma-separated list of taskmanager IDs.
+ * {@code /metrics?get=X,Y&taskmanagers=A,B}
+ */
+public class AggregatingTaskManagersMetricsHandler extends AbstractAggregatingMetricsHandler<AggregateTaskManagerMetricsParameters> {
+
+	public AggregatingTaskManagersMetricsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout, Map<String, String> responseHeaders,
+			Executor executor,
+			MetricFetcher<?> fetcher) {
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedTaskManagerMetricsHeaders.getInstance(), executor, fetcher);
+	}
+
+	@Nonnull
+	@Override
+	Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregateTaskManagerMetricsParameters> request) {
+		List<ResourceID> taskmanagers = request.getQueryParameter(TaskManagersFilterQueryParameter.class);
+		if (taskmanagers.isEmpty()) {
+			return store.getTaskManagers().values();
+		} else {
+			Collection<MetricStore.TaskManagerMetricStore> taskmanagerStores = new ArrayList<>(taskmanagers.size());
+			for (ResourceID taskmanager : taskmanagers) {
+				MetricStore.TaskManagerMetricStore taskManagerMetricStore = store.getTaskManagerMetricStore(taskmanager.getResourceIdString());
+				if (taskManagerMetricStore != null) {
+					taskmanagerStores.add(taskManagerMetricStore);
+				}
+			}
+			return taskmanagerStores;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
new file mode 100644
index 0000000..dc701d9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
+
+	/**
+	 * Adds the given value to this accumulator.
+	 *
+	 * @param value value to add
+	 */
+	void add(double value);
+
+	/**
+	 * Returns the current value of this accumulator.
+	 *
+	 * @return current value of this accumulator
+	 */
+	double getValue();
+
+	/**
+	 * Returns the name of this accumulator type. This name is used as a suffix for exposed metrics.
+	 *
+	 * @return name of this accumulator type
+	 */
+	String getName();
+
+	/**
+	 * A factory for {@link DoubleAccumulator}s. This allows us to regenerate a new set of accumulators for each metrics
+	 * without re-evaluating the "agg" query parameter or re-using existing accumulators.
+	 *
+	 * @param <A> DoubleAccumulator subclass
+	 */
+	interface DoubleAccumulatorFactory<A extends DoubleAccumulator> {
+		/**
+		 * Creates a new accumulator with the given initial value.
+		 *
+		 * @param init initial value
+		 * @return new accumulator with the given initial value
+		 */
+		A get(double init);
+	}
+
+	/**
+	 * Factory for {@link DoubleMaximum}.
+	 */
+	final class DoubleMaximumFactory implements DoubleAccumulatorFactory<DoubleMaximum> {
+		private static final DoubleMaximumFactory INSTANCE = new DoubleMaximumFactory();
+
+		private DoubleMaximumFactory(){
+		}
+
+		@Override
+		public DoubleMaximum get(double init) {
+			return new DoubleMaximum(init);
+		}
+
+		public static DoubleMaximumFactory get() {
+			return INSTANCE;
+		}
+	}
+
+	/**
+	 * Factory for {@link DoubleMinimum}.
+	 */
+	final class DoubleMinimumFactory implements DoubleAccumulatorFactory<DoubleMinimum> {
+		private static final DoubleMinimumFactory INSTANCE = new DoubleMinimumFactory();
+
+		private DoubleMinimumFactory(){
+		}
+
+		@Override
+		public DoubleMinimum get(double init) {
+			return new DoubleMinimum(init);
+		}
+
+		public static DoubleMinimumFactory get() {
+			return INSTANCE;
+		}
+	}
+
+	/**
+	 * Factory for {@link DoubleSum}.
+	 */
+	final class DoubleSumFactory implements DoubleAccumulatorFactory<DoubleSum> {
+		private static final DoubleSumFactory INSTANCE = new DoubleSumFactory();
+
+		private DoubleSumFactory(){
+		}
+
+		@Override
+		public DoubleSum get(double init) {
+			return new DoubleSum(init);
+		}
+
+		public static DoubleSumFactory get() {
+			return INSTANCE;
+		}
+	}
+
+	/**
+	 * Factory for {@link DoubleAverage}.
+	 */
+	final class DoubleAverageFactory implements DoubleAccumulatorFactory<DoubleAverage> {
+		private static final DoubleAverageFactory INSTANCE = new DoubleAverageFactory();
+
+		private DoubleAverageFactory(){
+		}
+
+		@Override
+		public DoubleAverage get(double init) {
+			return new DoubleAverage(init);
+		}
+
+		public static DoubleAverageFactory get() {
+			return INSTANCE;
+		}
+	}
+
+	/**
+	 * {@link DoubleAccumulator} that returns the maximum value.
+	 */
+	final class DoubleMaximum implements DoubleAccumulator {
+
+		public static final String NAME = "max";
+
+		private double value;
+
+		private DoubleMaximum(double init) {
+			value = init;
+		}
+
+		@Override
+		public void add(double value) {
+			this.value = Math.max(this.value, value);
+		}
+
+		@Override
+		public double getValue() {
+			return value;
+		}
+
+		@Override
+		public String getName() {
+			return NAME;
+		}
+	}
+
+	/**
+	 * {@link DoubleAccumulator} that returns the minimum value.
+	 */
+	final class DoubleMinimum implements DoubleAccumulator {
+
+		public static final String NAME = "min";
+
+		private double value;
+
+		private DoubleMinimum(double init) {
+			value = init;
+		}
+
+		@Override
+		public void add(double value) {
+			this.value = Math.min(this.value, value);
+		}
+
+		@Override
+		public double getValue() {
+			return value;
+		}
+
+		@Override
+		public String getName() {
+			return NAME;
+		}
+	}
+
+	/**
+	 * {@link DoubleAccumulator} that returns the sum of all values.
+	 */
+	final class DoubleSum implements DoubleAccumulator {
+
+		public static final String NAME = "sum";
+
+		private double value;
+
+		private DoubleSum(double init) {
+			value = init;
+		}
+
+		@Override
+		public void add(double value) {
+			this.value += value;
+		}
+
+		@Override
+		public double getValue() {
+			return value;
+		}
+
+		@Override
+		public String getName() {
+			return NAME;
+		}
+	}
+
+	/**
+	 * {@link DoubleAccumulator} that returns the average over all values.
+	 */
+	final class DoubleAverage implements DoubleAccumulator {
+
+		public static final String NAME = "avg";
+
+		private double sum;
+		private int count;
+
+		private DoubleAverage(double init) {
+			sum = init;
+			count = 1;
+		}
+
+		@Override
+		public void add(double value) {
+			this.sum += value;
+			this.count++;
+		}
+
+		@Override
+		public double getValue() {
+			return sum / count;
+		}
+
+		@Override
+		public String getName() {
+			return NAME;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java
new file mode 100644
index 0000000..4100802
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Based {@link MessageHeaders} class for aggregating metrics.
+ */
+public abstract class AbstractAggregatedMetricsHeaders<P extends AbstractAggregatedMetricsParameters<?>> implements MessageHeaders<EmptyRequestBody, AggregatedMetricsResponseBody, P> {
+	@Override
+	public Class<AggregatedMetricsResponseBody> getResponseClass() {
+		return AggregatedMetricsResponseBody.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java
new file mode 100644
index 0000000..a07141d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Base {@link MessageParameters} class for aggregating metrics.
+ */
+public abstract class AbstractAggregatedMetricsParameters<M extends MessageQueryParameter<?>> extends MessageParameters {
+	private final MetricsFilterParameter metrics = new MetricsFilterParameter();
+	private final MetricsAggregationParameter aggs = new MetricsAggregationParameter();
+	private final M selector;
+
+	AbstractAggregatedMetricsParameters(M selector) {
+		this.selector = selector;
+	}
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.unmodifiableCollection(Arrays.asList(
+			metrics,
+			aggs,
+			selector
+		));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java
new file mode 100644
index 0000000..0a053e6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for aggregating task manager metrics.
+ */
+public class AggregateTaskManagerMetricsParameters extends AbstractAggregatedMetricsParameters<TaskManagersFilterQueryParameter> {
+	public AggregateTaskManagerMetricsParameters() {
+		super(new TaskManagersFilterQueryParameter());
+	}
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Collections.emptyList();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java
new file mode 100644
index 0000000..265512e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+/**
+ * Headers for aggregating job metrics.
+ */
+public class AggregatedJobMetricsHeaders extends AbstractAggregatedMetricsHeaders<AggregatedJobMetricsParameters> {
+
+	private static final AggregatedJobMetricsHeaders INSTANCE = new AggregatedJobMetricsHeaders();
+
+	private AggregatedJobMetricsHeaders() {
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return "/jobs/metrics";
+	}
+
+	@Override
+	public AggregatedJobMetricsParameters getUnresolvedMessageParameters() {
+		return new AggregatedJobMetricsParameters();
+	}
+
+	public static AggregatedJobMetricsHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java
new file mode 100644
index 0000000..25df609
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for aggregating job metrics.
+ */
+public class AggregatedJobMetricsParameters extends AbstractAggregatedMetricsParameters<JobsFilterQueryParameter> {
+
+	public AggregatedJobMetricsParameters() {
+		super(new JobsFilterQueryParameter());
+	}
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Collections.emptyList();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
new file mode 100644
index 0000000..acafc3a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Response type for aggregated metrics. Contains the metric name and optionally the sum, average, minimum and maximum.
+ */
+public class AggregatedMetric {
+
+	private static final String FIELD_NAME_ID = "id";
+
+	private static final String FIELD_NAME_MIN = "min";
+
+	private static final String FIELD_NAME_MAX = "max";
+
+	private static final String FIELD_NAME_AVG = "avg";
+
+	private static final String FIELD_NAME_SUM = "sum";
+
+	@JsonProperty(value = FIELD_NAME_ID, required = true)
+	private final String id;
+
+	@JsonInclude(JsonInclude.Include.NON_NULL)
+	@JsonProperty(FIELD_NAME_MIN)
+	private final Double min;
+
+	@JsonInclude(JsonInclude.Include.NON_NULL)
+	@JsonProperty(FIELD_NAME_MAX)
+	private final Double max;
+
+	@JsonInclude(JsonInclude.Include.NON_NULL)
+	@JsonProperty(FIELD_NAME_AVG)
+	private final Double avg;
+
+	@JsonInclude(JsonInclude.Include.NON_NULL)
+	@JsonProperty(FIELD_NAME_SUM)
+	private final Double sum;
+
+	@JsonCreator
+	public AggregatedMetric(
+		final @JsonProperty(value = FIELD_NAME_ID, required = true) String id,
+		final @Nullable @JsonProperty(FIELD_NAME_MIN) Double min,
+		final @Nullable @JsonProperty(FIELD_NAME_MAX) Double max,
+		final @Nullable @JsonProperty(FIELD_NAME_AVG) Double avg,
+		final @Nullable @JsonProperty(FIELD_NAME_SUM) Double sum) {
+
+		this.id = requireNonNull(id, "id must not be null");
+		this.min = min;
+		this.max = max;
+		this.avg = avg;
+		this.sum = sum;
+	}
+
+	public AggregatedMetric(final @JsonProperty(value = FIELD_NAME_ID, required = true) String id) {
+		this(id, null, null, null, null);
+	}
+
+	@JsonIgnore
+	public String getId() {
+		return id;
+	}
+
+	@JsonIgnore
+	public Double getMin() {
+		return min;
+	}
+
+	@JsonIgnore
+	public Double getMax() {
+		return max;
+	}
+
+	@JsonIgnore
+	public Double getSum() {
+		return sum;
+	}
+
+	@JsonIgnore
+	public Double getAvg() {
+		return avg;
+	}
+
+	@Override
+	public String toString() {
+		return "AggregatedMetric{" +
+			"id='" + id + '\'' +
+			", mim='" + min + '\'' +
+			", max='" + max + '\'' +
+			", avg='" + avg + '\'' +
+			", sum='" + sum + '\'' +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java
new file mode 100644
index 0000000..b6b8dcc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Response type for a collection of aggregated metrics.
+ *
+ * <p>As JSON this type will be represented as an array of
+ * metrics, i.e., the field <code>metrics</code> will not show up. For example, a collection with a
+ * single metric will be represented as follows:
+ * <pre>
+ * {@code
+ * [{"id": "metricName", "min": "1"}]
+ * }
+ * </pre>
+ *
+ * @see AggregatedMetricsResponseBody.Serializer
+ * @see AggregatedMetricsResponseBody.Deserializer
+ * @see org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore
+ */
+@JsonSerialize(using = AggregatedMetricsResponseBody.Serializer.class)
+@JsonDeserialize(using = AggregatedMetricsResponseBody.Deserializer.class)
+public class AggregatedMetricsResponseBody implements ResponseBody {
+
+	private final Collection<AggregatedMetric> metrics;
+
+	public AggregatedMetricsResponseBody(Collection<AggregatedMetric> metrics) {
+
+		this.metrics = metrics;
+	}
+
+	@JsonIgnore
+	public Collection<AggregatedMetric> getMetrics() {
+		return metrics;
+	}
+
+	/**
+	 * JSON serializer for {@link AggregatedMetricsResponseBody}.
+	 */
+	public static class Serializer extends StdSerializer<AggregatedMetricsResponseBody> {
+
+		private static final long serialVersionUID = 1L;
+
+		protected Serializer() {
+			super(AggregatedMetricsResponseBody.class);
+		}
+
+		@Override
+		public void serialize(
+			AggregatedMetricsResponseBody metricCollectionResponseBody,
+			JsonGenerator jsonGenerator,
+			SerializerProvider serializerProvider) throws IOException {
+
+			jsonGenerator.writeObject(metricCollectionResponseBody.getMetrics());
+		}
+	}
+
+	/**
+	 * JSON deserializer for {@link AggregatedMetricsResponseBody}.
+	 */
+	public static class Deserializer extends StdDeserializer<AggregatedMetricsResponseBody> {
+
+		private static final long serialVersionUID = 1L;
+
+		protected Deserializer() {
+			super(AggregatedMetricsResponseBody.class);
+		}
+
+		@Override
+		public AggregatedMetricsResponseBody deserialize(
+			JsonParser jsonParser,
+			DeserializationContext deserializationContext) throws IOException {
+
+			return new AggregatedMetricsResponseBody(jsonParser.readValueAs(
+				new TypeReference<List<AggregatedMetric>>() {
+				}));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
new file mode 100644
index 0000000..e1d0790
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+
+/**
+ * Headers for aggregating subtask metrics.
+ */
+public class AggregatedSubtaskMetricsHeaders extends AbstractAggregatedMetricsHeaders<AggregatedSubtaskMetricsParameters> {
+
+	private static final AggregatedSubtaskMetricsHeaders INSTANCE = new AggregatedSubtaskMetricsHeaders();
+
+	private AggregatedSubtaskMetricsHeaders() {
+	}
+
+	@Override
+	public AggregatedSubtaskMetricsParameters getUnresolvedMessageParameters() {
+		return new AggregatedSubtaskMetricsParameters();
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return "/jobs/" + JobIDPathParameter.KEY + "/vertices/" + JobVertexIdPathParameter.KEY + "/subtasks/metrics";
+	}
+
+	public static AggregatedSubtaskMetricsHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
new file mode 100644
index 0000000..34e1b52
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for aggregating subtask metrics.
+ */
+public class AggregatedSubtaskMetricsParameters extends AbstractAggregatedMetricsParameters<SubtasksFilterQueryParameter> {
+
+	private final JobIDPathParameter jobId = new JobIDPathParameter();
+	private final JobVertexIdPathParameter vertexId = new JobVertexIdPathParameter();
+	private final SubtaskIndexPathParameter subtaskIndex = new SubtaskIndexPathParameter();
+
+	public AggregatedSubtaskMetricsParameters() {
+		super(new SubtasksFilterQueryParameter());
+	}
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Collections.unmodifiableCollection(Arrays.asList(
+			jobId,
+			vertexId,
+			subtaskIndex
+		));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java
new file mode 100644
index 0000000..5b5fe4c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+/**
+ * Headers for aggregating task manager metrics.
+ */
+public class AggregatedTaskManagerMetricsHeaders extends AbstractAggregatedMetricsHeaders<AggregateTaskManagerMetricsParameters> {
+
+	private static final AggregatedTaskManagerMetricsHeaders INSTANCE = new AggregatedTaskManagerMetricsHeaders();
+
+	private AggregatedTaskManagerMetricsHeaders() {
+	}
+
+	@Override
+	public AggregateTaskManagerMetricsParameters getUnresolvedMessageParameters() {
+		return new AggregateTaskManagerMetricsParameters();
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return "/taskmanagers/metrics";
+	}
+
+	public static AggregatedTaskManagerMetricsHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java
new file mode 100644
index 0000000..fb57f87
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+
+/**
+ * {@link MessageQueryParameter} for selecting jobs when aggregating metrics.
+ */
+public class JobsFilterQueryParameter extends MessageQueryParameter<JobID> {
+
+	JobsFilterQueryParameter() {
+		super("jobs", MessageParameterRequisiteness.OPTIONAL);
+	}
+
+	@Override
+	public JobID convertStringToValue(String value) throws ConversionException {
+		try {
+			return JobID.fromHexString(value);
+		} catch (IllegalArgumentException iae) {
+			throw new ConversionException("Not a valid job ID: " + value, iae);
+		}
+	}
+
+	@Override
+	public String convertValueToString(JobID value) {
+		return value.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
new file mode 100644
index 0000000..1057788
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Locale;
+
+/**
+ * TODO: add javadoc.
+ */
+public class MetricsAggregationParameter extends MessageQueryParameter<MetricsAggregationParameter.AggregationMode> {
+
+	protected MetricsAggregationParameter() {
+		super("agg", MessageParameterRequisiteness.OPTIONAL);
+	}
+
+	@Override
+	public AggregationMode convertStringToValue(String value) throws ConversionException {
+		try {
+			return AggregationMode.valueOf(value.toUpperCase(Locale.ROOT));
+		} catch (IllegalArgumentException iae) {
+			throw new ConversionException("Not a valid aggregation: " + value, iae);
+		}
+	}
+
+	@Override
+	public String convertValueToString(AggregationMode value) {
+		return value.name().toLowerCase();
+	}
+
+	/**
+	 * The available aggregations.
+	 */
+	public enum AggregationMode {
+		MIN,
+		MAX,
+		SUM,
+		AVG
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java
new file mode 100644
index 0000000..fe5d37e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+/**
+ * {@link MessageQueryParameter} for selecting subtasks when aggregating metrics.
+ */
+public class SubtasksFilterQueryParameter extends MessageQueryParameter<String> {
+
+	SubtasksFilterQueryParameter() {
+		super("subtasks", MessageParameterRequisiteness.OPTIONAL);
+	}
+
+	@Override
+	public String convertStringToValue(String value) {
+		return value;
+	}
+
+	@Override
+	public String convertValueToString(String value) {
+		return value;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java
new file mode 100644
index 0000000..dcd6934
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+/**
+ * {@link MessageQueryParameter} for selecting task managers when aggregating metrics.
+ */
+public class TaskManagersFilterQueryParameter extends MessageQueryParameter<ResourceID> {
+
+	TaskManagersFilterQueryParameter() {
+		super("taskmanagers", MessageParameterRequisiteness.OPTIONAL);
+	}
+
+	@Override
+	public ResourceID convertStringToValue(String value) {
+		return new ResourceID(value);
+	}
+
+	@Override
+	public String convertValueToString(ResourceID value) {
+		return value.getResourceIdString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 7e552de..fb663ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -56,6 +56,9 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatistic
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingJobsMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingTaskManagersMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
@@ -393,6 +396,33 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 			responseHeaders,
 			metricFetcher);
 
+		final AggregatingTaskManagersMetricsHandler aggregatingTaskManagersMetricsHandler = new AggregatingTaskManagersMetricsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			executor,
+			metricFetcher
+		);
+
+		final AggregatingJobsMetricsHandler aggregatingJobsMetricsHandler = new AggregatingJobsMetricsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			executor,
+			metricFetcher
+		);
+
+		final AggregatingSubtasksMetricsHandler aggregatingSubtasksMetricsHandler = new AggregatingSubtasksMetricsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			executor,
+			metricFetcher
+		);
+
 		final JobVertexTaskManagersHandler jobVertexTaskManagersHandler = new JobVertexTaskManagersHandler(
 			restAddressFuture,
 			leaderRetriever,
@@ -553,6 +583,9 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		handlers.add(Tuple2.of(SubtaskMetricsHeaders.getInstance(), subtaskMetricsHandler));
 		handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), taskManagerMetricsHandler));
 		handlers.add(Tuple2.of(JobManagerMetricsHeaders.getInstance(), jobManagerMetricsHandler));
+		handlers.add(Tuple2.of(aggregatingTaskManagersMetricsHandler.getMessageHeaders(), aggregatingTaskManagersMetricsHandler));
+		handlers.add(Tuple2.of(aggregatingJobsMetricsHandler.getMessageHeaders(), aggregatingJobsMetricsHandler));
+		handlers.add(Tuple2.of(aggregatingSubtasksMetricsHandler.getMessageHeaders(), aggregatingSubtasksMetricsHandler));
 		handlers.add(Tuple2.of(JobExecutionResultHeaders.getInstance(), jobExecutionResultHandler));
 		handlers.add(Tuple2.of(SavepointTriggerHeaders.getInstance(), savepointTriggerHandler));
 		handlers.add(Tuple2.of(SavepointStatusHeaders.getInstance(), savepointStatusHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
new file mode 100644
index 0000000..2dac8bf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedJobMetricsParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests for the {@link AggregatingJobsMetricsHandler}.
+ */
+public class AggregatingJobsMetricsHandlerTest extends AggregatingMetricsHandlerTestBase<AggregatingJobsMetricsHandler, AggregatedJobMetricsParameters> {
+
+	private static final JobID JOB_ID_1 = JobID.generate();
+	private static final JobID JOB_ID_2 = JobID.generate();
+	private static final JobID JOB_ID_3 = JobID.generate();
+
+	@Override
+	protected Tuple2<String, List<String>> getFilter() {
+		return Tuple2.of("jobs", Arrays.asList(JOB_ID_1.toString(), JOB_ID_3.toString()));
+	}
+
+	@Override
+	protected Collection<MetricDump> getMetricDumps() {
+		Collection<MetricDump> dumps = new ArrayList<>(3);
+		QueryScopeInfo.JobQueryScopeInfo job = new QueryScopeInfo.JobQueryScopeInfo(JOB_ID_1.toString(), "abc");
+		MetricDump.CounterDump cd1 = new MetricDump.CounterDump(job, "metric1", 1);
+		dumps.add(cd1);
+
+		QueryScopeInfo.JobQueryScopeInfo job2 = new QueryScopeInfo.JobQueryScopeInfo(JOB_ID_2.toString(), "abc");
+		MetricDump.CounterDump cd2 = new MetricDump.CounterDump(job2, "metric1", 3);
+		dumps.add(cd2);
+
+		QueryScopeInfo.JobQueryScopeInfo job3 = new QueryScopeInfo.JobQueryScopeInfo(JOB_ID_3.toString(), "abc");
+		MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job3, "metric2", 5);
+		dumps.add(cd3);
+		return dumps;
+	}
+
+	@Override
+	protected AggregatingJobsMetricsHandler getHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) {
+		return new AggregatingJobsMetricsHandler(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			executor,
+			fetcher
+		);
+	}
+}


[06/11] flink git commit: [FLINK-9156][REST][CLI] Update --jobmanager option logic for REST client

Posted by ch...@apache.org.
[FLINK-9156][REST][CLI] Update --jobmanager option logic for REST client

This closes #5838.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47909f46
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47909f46
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47909f46

Branch: refs/heads/release-1.5
Commit: 47909f466b9c9ee1f4caf94e9f6862a21b628817
Parents: 50504ce
Author: zentol <ch...@apache.org>
Authored: Wed Apr 11 12:48:51 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 16 21:18:32 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/client/cli/CliFrontend.java    |  3 ++
 .../client/program/rest/RestClusterClient.java  |  3 +-
 .../program/rest/RestClusterClientTest.java     | 35 ++++++++++++++++++++
 3 files changed, 40 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index ce6556b..65f470b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -37,6 +37,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -1141,6 +1142,8 @@ public class CliFrontend {
 	public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
 		config.setString(JobManagerOptions.ADDRESS, address.getHostString());
 		config.setInteger(JobManagerOptions.PORT, address.getPort());
+		config.setString(RestOptions.REST_ADDRESS, address.getHostString());
+		config.setInteger(RestOptions.REST_PORT, address.getPort());
 	}
 
 	public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {

http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index a6f676e..3d50e93 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -719,7 +719,8 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 				.orElse(false);
 	}
 
-	private CompletableFuture<URL> getWebMonitorBaseUrl() {
+	@VisibleForTesting
+	CompletableFuture<URL> getWebMonitorBaseUrl() {
 		return FutureUtils.orTimeout(
 				webMonitorLeaderRetriever.getLeaderFuture(),
 				restClusterClientConfiguration.getAwaitLeaderTimeout(),

http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index e7f9bf9..e2daad6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
@@ -100,6 +102,7 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import org.apache.commons.cli.CommandLine;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -111,6 +114,7 @@ import org.mockito.MockitoAnnotations;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
+import java.net.URL;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -677,6 +681,37 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that command line options override the configuration settings.
+	 */
+	@Test
+	public void testRESTManualConfigurationOverride() throws Exception {
+		final String configuredHostname = "localhost";
+		final int configuredPort = 1234;
+		final Configuration configuration = new Configuration();
+
+		configuration.setString(JobManagerOptions.ADDRESS, configuredHostname);
+		configuration.setInteger(JobManagerOptions.PORT, configuredPort);
+		configuration.setString(RestOptions.REST_ADDRESS, configuredHostname);
+		configuration.setInteger(RestOptions.REST_PORT, configuredPort);
+
+		final DefaultCLI defaultCLI = new DefaultCLI(configuration);
+
+		final String manualHostname = "123.123.123.123";
+		final int manualPort = 4321;
+		final String[] args = {"-m", manualHostname + ':' + manualPort};
+
+		CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
+
+		final StandaloneClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
+
+		final RestClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
+
+		URL webMonitorBaseUrl = clusterClient.getWebMonitorBaseUrl().get();
+		assertThat(webMonitorBaseUrl.getHost(), equalTo(manualHostname));
+		assertThat(webMonitorBaseUrl.getPort(), equalTo(manualPort));
+	}
+
 	private class TestAccumulatorHandler extends TestHandler<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> {
 
 		public TestAccumulatorHandler() {


[05/11] flink git commit: [FLINK-9173][REST] Improve client error message for parsing failures

Posted by ch...@apache.org.
[FLINK-9173][REST] Improve client error message for parsing failures

- print parsing exception for expected type, not error
- add toString implemented to JsonResponse


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50504ced
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50504ced
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50504ced

Branch: refs/heads/release-1.5
Commit: 50504ced6f162bd9247f8da49889ad2ea0183c0d
Parents: c6d45b9
Author: zentol <ch...@apache.org>
Authored: Mon Apr 16 10:31:52 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 16 21:18:32 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/runtime/rest/RestClient.java  | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50504ced/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 6319634..df97f20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -217,7 +217,7 @@ public class RestClient {
 		try {
 			P response = objectMapper.readValue(jsonParser, responseType);
 			responseFuture.complete(response);
-		} catch (IOException ioe) {
+		} catch (IOException originalException) {
 			// the received response did not matched the expected response type
 
 			// lets see if it is an ErrorResponse instead
@@ -231,7 +231,7 @@ public class RestClient {
 				responseFuture.completeExceptionally(
 					new RestClientException(
 						"Response was neither of the expected type(" + responseType + ") nor an error.",
-						jpe2,
+						originalException,
 						rawResponse.getHttpResponseStatus()));
 			}
 		}
@@ -328,5 +328,13 @@ public class RestClient {
 		public HttpResponseStatus getHttpResponseStatus() {
 			return httpResponseStatus;
 		}
+
+		@Override
+		public String toString() {
+			return "JsonResponse{" +
+				"json=" + json +
+				", httpResponseStatus=" + httpResponseStatus +
+				'}';
+		}
 	}
 }


[07/11] flink git commit: [FLINK-9177][docs] Update Mesos getting started link

Posted by ch...@apache.org.
[FLINK-9177][docs] Update Mesos getting started link

This closes #5850.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39e9e19c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39e9e19c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39e9e19c

Branch: refs/heads/release-1.5
Commit: 39e9e19c5d663d5e69a845af8b00d7de20380101
Parents: 23d4543
Author: Arunan Sugunakumar <ar...@gmail.com>
Authored: Mon Apr 16 07:26:34 2018 +0000
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 16 21:18:33 2018 +0200

----------------------------------------------------------------------
 docs/ops/deployment/mesos.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/39e9e19c/docs/ops/deployment/mesos.md
----------------------------------------------------------------------
diff --git a/docs/ops/deployment/mesos.md b/docs/ops/deployment/mesos.md
index 56f9d93..74bae39 100644
--- a/docs/ops/deployment/mesos.md
+++ b/docs/ops/deployment/mesos.md
@@ -101,7 +101,7 @@ You can also run Mesos without DC/OS.
 
 ### Installing Mesos
 
-Please follow the [instructions on how to setup Mesos on the official website](http://mesos.apache.org/documentation/latest/getting-started/).
+Please follow the [instructions on how to setup Mesos on the official website](http://mesos.apache.org/getting-started/).
 
 After installation you have to configure the set of master and agent nodes by creating the files `MESOS_HOME/etc/mesos/masters` and `MESOS_HOME/etc/mesos/slaves`.
 These files contain in each row a single hostname on which the respective component will be started (assuming SSH access to these nodes).