You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/29 13:56:10 UTC

[1/9] flink git commit: [FLINK-7717][flip6] Migrate TaskManagerMetricsHandler to new RestServerEndpoint

Repository: flink
Updated Branches:
  refs/heads/master 44d973d51 -> 4daf9223a


[FLINK-7717][flip6] Migrate TaskManagerMetricsHandler to new RestServerEndpoint

Migrate logic in
org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler
to new handler, and add new handler to DispatcherRestEndpoint.

[FLINK-7717][flip6] Use taskmanagerid constant in TaskManagerMetricsHandlerTest

This closes #5081.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb85640b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb85640b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb85640b

Branch: refs/heads/master
Commit: cb85640bb3da2e64eafbfc5c749aab7463702ebd
Parents: 3bf67b4
Author: gyao <ga...@data-artisans.com>
Authored: Mon Nov 27 13:57:48 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 14:52:53 2017 +0100

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      | 10 +++
 .../job/metrics/TaskManagerMetricsHandler.java  | 64 ++++++++++++++++++++
 .../job/metrics/TaskManagerMetricsHeaders.java  | 50 +++++++++++++++
 .../TaskManagerMetricsMessageParameters.java    | 41 +++++++++++++
 .../taskmanager/TaskManagerIdPathParameter.java |  2 +-
 .../metrics/TaskManagerMetricsHandlerTest.java  | 56 +++++++++++++++++
 .../metrics/TaskManagerMetricsHeadersTest.java  | 49 +++++++++++++++
 7 files changed, 271 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cb85640b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 2991f0b..d132890 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCach
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
@@ -69,6 +70,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistic
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
@@ -315,6 +317,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			responseHeaders,
 			metricFetcher);
 
+		final TaskManagerMetricsHandler taskManagerMetricsHandler = new TaskManagerMetricsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			metricFetcher);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -351,6 +360,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(TaskManagerDetailsHeaders.getInstance(), taskManagerDetailsHandler));
 		handlers.add(Tuple2.of(SubtasksTimesHeaders.getInstance(), subtasksTimesHandler));
 		handlers.add(Tuple2.of(JobVertexMetricsHeaders.getInstance(), jobVertexMetricsHandler));
+		handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), taskManagerMetricsHandler));
 
 		optWebContent.ifPresent(
 			webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));

http://git-wip-us.apache.org/repos/asf/flink/blob/cb85640b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java
new file mode 100644
index 0000000..9a284d7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handler that returns TaskManager metrics.
+ *
+ * @see MetricStore#getTaskManagerMetricStore(String)
+ */
+public class TaskManagerMetricsHandler extends AbstractMetricsHandler<TaskManagerMetricsMessageParameters> {
+
+	public TaskManagerMetricsHandler(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<DispatcherGateway> leaderRetriever,
+			final Time timeout,
+			final Map<String, String> headers,
+			final MetricFetcher metricFetcher) {
+		super(localRestAddress, leaderRetriever, timeout, headers, TaskManagerMetricsHeaders.getInstance(),
+			metricFetcher);
+	}
+
+	@Nullable
+	@Override
+	protected MetricStore.ComponentMetricStore getComponentMetricStore(
+			final HandlerRequest<EmptyRequestBody, TaskManagerMetricsMessageParameters> request,
+			final MetricStore metricStore) {
+		final ResourceID taskManagerId = request.getPathParameter(TaskManagerIdPathParameter.class);
+		return metricStore.getTaskManagerMetricStore(taskManagerId.toString());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb85640b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeaders.java
new file mode 100644
index 0000000..ddc5fba
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeaders.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+
+/**
+ * {@link MessageHeaders} for
+ * {@link org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler}.
+ */
+public final class TaskManagerMetricsHeaders extends
+	AbstractMetricsHeaders<TaskManagerMetricsMessageParameters> {
+
+	private static final TaskManagerMetricsHeaders INSTANCE = new TaskManagerMetricsHeaders();
+
+	private TaskManagerMetricsHeaders() {
+	}
+
+	@Override
+	public TaskManagerMetricsMessageParameters getUnresolvedMessageParameters() {
+		return new TaskManagerMetricsMessageParameters();
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return "/taskmanagers/:" + TaskManagerIdPathParameter.KEY + "/metrics";
+	}
+
+	public static TaskManagerMetricsHeaders getInstance() {
+		return INSTANCE;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb85640b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java
new file mode 100644
index 0000000..d7e9381
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * {@link MessageParameters} for
+ * {@link org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler}.
+ */
+public class TaskManagerMetricsMessageParameters extends TaskManagerMessageParameters {
+
+	private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter();
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.singletonList(metricsFilterParameter);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb85640b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
index 7456fee..f1daf0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
@@ -28,7 +28,7 @@ public class TaskManagerIdPathParameter extends MessagePathParameter<ResourceID>
 
 	public static final String KEY = "taskmanagerid";
 
-	protected TaskManagerIdPathParameter() {
+	public TaskManagerIdPathParameter() {
 		super(KEY);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cb85640b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandlerTest.java
new file mode 100644
index 0000000..7414dac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandlerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Tests for {@link TaskManagerMetricsHandler}.
+ */
+public class TaskManagerMetricsHandlerTest extends
+	MetricsHandlerTestBase<TaskManagerMetricsHandler> {
+
+	private static final String TEST_TASK_MANAGER_ID = new InstanceID().toString();
+
+	@Override
+	TaskManagerMetricsHandler getMetricsHandler() {
+		return new TaskManagerMetricsHandler(
+			TEST_REST_ADDRESS,
+			leaderRetriever,
+			TIMEOUT,
+			TEST_HEADERS,
+			mockMetricFetcher);
+	}
+
+	@Override
+	QueryScopeInfo getQueryScopeInfo() {
+		return new QueryScopeInfo.TaskManagerQueryScopeInfo(TEST_TASK_MANAGER_ID);
+	}
+
+	@Override
+	Map<String, String> getPathParameters() {
+		return Collections.singletonMap(TaskManagerIdPathParameter.KEY, TEST_TASK_MANAGER_ID);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb85640b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeadersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeadersTest.java
new file mode 100644
index 0000000..ee2848a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeadersTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskManagerMetricsHeaders}.
+ */
+public class TaskManagerMetricsHeadersTest {
+
+	private final TaskManagerMetricsHeaders taskManagerMetricsHeaders =
+		TaskManagerMetricsHeaders.getInstance();
+
+	@Test
+	public void testUrl() {
+		assertThat(taskManagerMetricsHeaders.getTargetRestEndpointURL(),
+			equalTo("/taskmanagers/:" + TaskManagerIdPathParameter.KEY + "/metrics"));
+	}
+
+	@Test
+	public void testMessageParameters() {
+		assertThat(taskManagerMetricsHeaders.getUnresolvedMessageParameters(),
+			instanceOf(TaskManagerMetricsMessageParameters.class));
+	}
+
+}


[2/9] flink git commit: [FLINK-8150] [flip6] Add TaskManagerIdPathParameterTest

Posted by tr...@apache.org.
[FLINK-8150] [flip6] Add TaskManagerIdPathParameterTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3bf67b46
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3bf67b46
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3bf67b46

Branch: refs/heads/master
Commit: 3bf67b46567baa84c8c1a1e0b5ec282227bcb0bc
Parents: dc7ab13
Author: gyao <ga...@data-artisans.com>
Authored: Tue Nov 28 13:53:51 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 14:52:53 2017 +0100

----------------------------------------------------------------------
 .../resourcemanager/ResourceManager.java        | 10 ++--
 .../taskmanager/TaskManagerDetailsHandler.java  | 37 +++++++------
 .../messages/json/ResourceIDDeserializer.java   |  1 +
 .../messages/json/ResourceIDSerializer.java     |  3 +-
 .../taskmanager/TaskManagerDetailsInfo.java     |  2 +-
 .../taskmanager/TaskManagerIdPathParameter.java |  3 +-
 .../messages/taskmanager/TaskManagerInfo.java   |  6 +--
 .../ResourceManagerTaskExecutorTest.java        | 13 ++++-
 .../TaskManagerIdPathParameterTest.java         | 56 ++++++++++++++++++++
 9 files changed, 100 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 4220ebf..a0ff5f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -503,14 +503,15 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		final ArrayList<TaskManagerInfo> taskManagerInfos = new ArrayList<>(taskExecutors.size());
 
 		for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> taskExecutorEntry : taskExecutors.entrySet()) {
+			final ResourceID resourceId = taskExecutorEntry.getKey();
 			final WorkerRegistration<WorkerType> taskExecutor = taskExecutorEntry.getValue();
 
 			taskManagerInfos.add(
 				new TaskManagerInfo(
-					taskExecutorEntry.getKey(),
+					resourceId,
 					taskExecutor.getTaskExecutorGateway().getAddress(),
 					taskExecutor.getDataPort(),
-					taskManagerHeartbeatManager.getLastHeartbeatFrom(taskExecutorEntry.getKey()),
+					taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
 					slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
 					slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
 					taskExecutor.getHardwareDescription()));
@@ -527,13 +528,14 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		if (taskExecutor == null) {
 			return FutureUtils.completedExceptionally(new FlinkException("Requested TaskManager " + resourceId + " is not registered."));
 		} else {
+			final InstanceID instanceId = taskExecutor.getInstanceID();
 			final TaskManagerInfo taskManagerInfo = new TaskManagerInfo(
 				resourceId,
 				taskExecutor.getTaskExecutorGateway().getAddress(),
 				taskExecutor.getDataPort(),
 				taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
-				slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
-				slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
+				slotManager.getNumberRegisteredSlotsOf(instanceId),
+				slotManager.getNumberFreeSlotsOf(instanceId),
 				taskExecutor.getHardwareDescription());
 
 			return CompletableFuture.completedFuture(taskManagerInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
index b8c1a60..e66a61a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
@@ -71,32 +71,31 @@ public class TaskManagerDetailsHandler<T extends RestfulGateway> extends Abstrac
 	protected CompletableFuture<TaskManagerDetailsInfo> handleRequest(
 			@Nonnull HandlerRequest<EmptyRequestBody, TaskManagerMessageParameters> request,
 			@Nonnull ResourceManagerGateway gateway) throws RestHandlerException {
-		final ResourceID taskManagerInstanceId = request.getPathParameter(TaskManagerIdPathParameter.class);
+		final ResourceID taskManagerResourceId = request.getPathParameter(TaskManagerIdPathParameter
+			.class);
 
-		CompletableFuture<TaskManagerInfo> taskManagerInfoFuture = gateway.requestTaskManagerInfo(taskManagerInstanceId, timeout);
+		CompletableFuture<TaskManagerInfo> taskManagerInfoFuture = gateway.requestTaskManagerInfo(taskManagerResourceId, timeout);
 
 		metricFetcher.update();
 
 		return taskManagerInfoFuture.thenApply(
 			(TaskManagerInfo taskManagerInfo) -> {
-				// the MetricStore is not yet thread safe, therefore we still have to synchronize it
-				synchronized (metricStore) {
-					final MetricStore.TaskManagerMetricStore tmMetrics = metricStore.getTaskManagerMetricStore(taskManagerInstanceId.toString());
-
-					final TaskManagerMetricsInfo taskManagerMetricsInfo;
-
-					if (tmMetrics != null) {
-						log.debug("Create metrics info for TaskManager {}.", taskManagerInstanceId);
-						taskManagerMetricsInfo = createTaskManagerMetricsInfo(tmMetrics);
-					} else {
-						log.debug("No metrics for TaskManager {}.", taskManagerInstanceId);
-						taskManagerMetricsInfo = TaskManagerMetricsInfo.empty();
-					}
-
-					return new TaskManagerDetailsInfo(
-						taskManagerInfo,
-						taskManagerMetricsInfo);
+				final MetricStore.TaskManagerMetricStore tmMetrics =
+					metricStore.getTaskManagerMetricStore(taskManagerResourceId.getResourceIdString());
+
+				final TaskManagerMetricsInfo taskManagerMetricsInfo;
+
+				if (tmMetrics != null) {
+					log.debug("Create metrics info for TaskManager {}.", taskManagerResourceId);
+					taskManagerMetricsInfo = createTaskManagerMetricsInfo(tmMetrics);
+				} else {
+					log.debug("No metrics for TaskManager {}.", taskManagerResourceId);
+					taskManagerMetricsInfo = TaskManagerMetricsInfo.empty();
 				}
+
+				return new TaskManagerDetailsInfo(
+					taskManagerInfo,
+					taskManagerMetricsInfo);
 			});
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java
index cc0d086..0e0feb4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java
@@ -41,4 +41,5 @@ public class ResourceIDDeserializer extends StdDeserializer<ResourceID> {
 	public ResourceID deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
 		return new ResourceID(p.getValueAsString());
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
index fa95451..970d95f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
@@ -40,6 +40,7 @@ public class ResourceIDSerializer extends StdSerializer<ResourceID> {
 
 	@Override
 	public void serialize(ResourceID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
-		gen.writeString(value.toString());
+		gen.writeString(value.getResourceIdString());
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
index 3b0a68f..f830ac8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
@@ -43,7 +43,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
 
 	@JsonCreator
 	public TaskManagerDetailsInfo(
-			@JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId,
+			@JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_RESOURCE_ID) ResourceID resourceId,
 			@JsonProperty(FIELD_NAME_ADDRESS) String address,
 			@JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
 			@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
index b0cd2b5..7456fee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.rest.messages.taskmanager;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.rest.messages.ConversionException;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 
 /**
@@ -34,7 +33,7 @@ public class TaskManagerIdPathParameter extends MessagePathParameter<ResourceID>
 	}
 
 	@Override
-	protected ResourceID convertFromString(String value) throws ConversionException {
+	protected ResourceID convertFromString(String value) {
 		return new ResourceID(value);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
index 1ad7e7d..e1452e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
@@ -38,7 +38,7 @@ import java.util.Objects;
  */
 public class TaskManagerInfo implements ResponseBody {
 
-	public static final String FIELD_NAME_INSTANCE_ID = "id";
+	public static final String FIELD_NAME_RESOURCE_ID = "id";
 
 	public static final String FIELD_NAME_ADDRESS = "path";
 
@@ -52,7 +52,7 @@ public class TaskManagerInfo implements ResponseBody {
 
 	public static final String FIELD_NAME_HARDWARE = "hardware";
 
-	@JsonProperty(FIELD_NAME_INSTANCE_ID)
+	@JsonProperty(FIELD_NAME_RESOURCE_ID)
 	@JsonSerialize(using = ResourceIDSerializer.class)
 	private final ResourceID resourceId;
 
@@ -76,7 +76,7 @@ public class TaskManagerInfo implements ResponseBody {
 
 	@JsonCreator
 	public TaskManagerInfo(
-			@JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId,
+			@JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_RESOURCE_ID) ResourceID resourceId,
 			@JsonProperty(FIELD_NAME_ADDRESS) String address,
 			@JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
 			@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 524c503..66d3ad6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -48,10 +49,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ResourceManagerTaskExecutorTest extends TestLogger {
 
@@ -101,7 +105,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 	}
 
 	/**
-	 * Test receive normal registration from task executor and receive duplicate registration from task executor
+	 * Test receive normal registration from task executor and receive duplicate registration
+	 * from task executor.
 	 */
 	@Test
 	public void testRegisterTaskExecutor() throws Exception {
@@ -111,6 +116,10 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, dataPort, hardwareDescription, timeout);
 			RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+			final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
+				taskExecutorResourceID,
+				timeout).get();
+			assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
 
 			// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
 			CompletableFuture<RegistrationResponse> duplicateFuture =
@@ -168,6 +177,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
 	private ResourceID mockTaskExecutor(String taskExecutorAddress) {
 		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		when(taskExecutorGateway.getAddress()).thenReturn(taskExecutorAddress);
+
 		ResourceID taskExecutorResourceID = ResourceID.generate();
 		rpcService.registerGateway(taskExecutorAddress, taskExecutorGateway);
 		return taskExecutorResourceID;

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java
new file mode 100644
index 0000000..f9c234f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link TaskManagerIdPathParameter}.
+ */
+public class TaskManagerIdPathParameterTest {
+
+	private TaskManagerIdPathParameter taskManagerIdPathParameter;
+
+	@Before
+	public void setUp() {
+		taskManagerIdPathParameter = new TaskManagerIdPathParameter();
+	}
+
+	@Test
+	public void testConversions() {
+		final String resourceIdString = "foo";
+		final ResourceID resourceId = taskManagerIdPathParameter.convertFromString(resourceIdString);
+		assertThat(resourceId.getResourceIdString(), equalTo(resourceIdString));
+
+		assertThat(taskManagerIdPathParameter.convertToString(resourceId), equalTo(resourceIdString));
+	}
+
+	@Test
+	public void testIsMandatory() {
+		assertTrue(taskManagerIdPathParameter.isMandatory());
+	}
+
+}


[6/9] flink git commit: [hotfix][Javadoc] Fix typo in ConversionException

Posted by tr...@apache.org.
[hotfix][Javadoc] Fix typo in ConversionException


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64636857
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64636857
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64636857

Branch: refs/heads/master
Commit: 6463685730cb26b2b2318ad57645b340113417de
Parents: 49a8996
Author: gyao <ga...@data-artisans.com>
Authored: Mon Nov 27 14:24:42 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 14:52:54 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/rest/messages/ConversionException.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/64636857/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConversionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConversionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConversionException.java
index 7feceb3..a984b1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConversionException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConversionException.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.rest.messages;
 import org.apache.flink.util.FlinkException;
 
 /**
- * Exception which is thrown if an input cannot converted into the requested type.
+ * Exception which is thrown if an input cannot be converted into the requested type.
  */
 public class ConversionException extends FlinkException {
 


[4/9] flink git commit: [FLINK-8143][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint

Posted by tr...@apache.org.
[FLINK-8143][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint

Migrate logic from
org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler to
new handler. Add new handler to DispatcherRestEndpoint.

[FLINK-8143][flip6] Assert that SubtaskIndexPathParameter is mandatory

[FLINK-8143][flip6] Use path parameter constants in SubtaskMetricsHandlerTest

This closes #5082.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97f6b63b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97f6b63b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97f6b63b

Branch: refs/heads/master
Commit: 97f6b63b12ba9f3fab119e997c738b38d3adbef9
Parents: 6463685
Author: gyao <ga...@data-artisans.com>
Authored: Tue Nov 28 16:30:27 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 14:52:54 2017 +0100

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      | 10 +++
 .../job/metrics/JobVertexMetricsHandler.java    |  2 +
 .../job/metrics/SubtaskMetricsHandler.java      | 72 ++++++++++++++++++++
 .../messages/SubtaskIndexPathParameter.java     | 47 +++++++++++++
 .../job/metrics/SubtaskMetricsHeaders.java      | 53 ++++++++++++++
 .../SubtaskMetricsMessageParameters.java        | 60 ++++++++++++++++
 .../job/metrics/SubtaskMetricsHandlerTest.java  | 69 +++++++++++++++++++
 .../messages/SubtaskIndexPathParameterTest.java | 67 ++++++++++++++++++
 .../job/metrics/SubtaskMetricsHeadersTest.java  | 50 ++++++++++++++
 9 files changed, 430 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 9916371..8a26a9fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatis
 import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
@@ -72,6 +73,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisti
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
@@ -319,6 +321,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			responseHeaders,
 			metricFetcher);
 
+		final SubtaskMetricsHandler subtaskMetricsHandler = new SubtaskMetricsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			metricFetcher);
+
 		final TaskManagerMetricsHandler taskManagerMetricsHandler = new TaskManagerMetricsHandler(
 			restAddressFuture,
 			leaderRetriever,
@@ -369,6 +378,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(TaskManagerDetailsHeaders.getInstance(), taskManagerDetailsHandler));
 		handlers.add(Tuple2.of(SubtasksTimesHeaders.getInstance(), subtasksTimesHandler));
 		handlers.add(Tuple2.of(JobVertexMetricsHeaders.getInstance(), jobVertexMetricsHandler));
+		handlers.add(Tuple2.of(SubtaskMetricsHeaders.getInstance(), subtaskMetricsHandler));
 		handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), taskManagerMetricsHandler));
 		handlers.add(Tuple2.of(JobManagerMetricsHeaders.getInstance(), jobManagerMetricsHandler));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
index 4f83db2..f8f4702 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
@@ -39,6 +39,8 @@ import java.util.concurrent.CompletableFuture;
  * Handler that returns metrics given a {@link JobID} and {@link JobVertexID}.
  *
  * @see MetricStore#getTaskMetricStore(String, String)
+ * @deprecated This class is subsumed by {@link SubtaskMetricsHandler} and is only kept for
+ * backwards-compatibility.
  */
 public class JobVertexMetricsHandler extends AbstractMetricsHandler<JobVertexMetricsMessageParameters> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java
new file mode 100644
index 0000000..cb3d864
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsMessageParameters;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handler that returns subtask metrics.
+ *
+ * @see MetricStore#getSubtaskMetricStore(String, String, int)
+ */
+public class SubtaskMetricsHandler extends AbstractMetricsHandler<SubtaskMetricsMessageParameters> {
+
+	public SubtaskMetricsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<DispatcherGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> headers,
+			MetricFetcher metricFetcher) {
+
+		super(localRestAddress, leaderRetriever, timeout, headers, SubtaskMetricsHeaders.getInstance(),
+			metricFetcher);
+	}
+
+	@Nullable
+	@Override
+	protected MetricStore.ComponentMetricStore getComponentMetricStore(
+			HandlerRequest<EmptyRequestBody, SubtaskMetricsMessageParameters> request,
+			MetricStore metricStore) {
+
+		final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+		final JobVertexID vertexId = request.getPathParameter(JobVertexIdPathParameter.class);
+		final int subtaskIndex = request.getPathParameter(SubtaskIndexPathParameter.class);
+
+		return metricStore.getSubtaskMetricStore(jobId.toString(), vertexId.toString(), subtaskIndex);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameter.java
new file mode 100644
index 0000000..e8f2268
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Path parameter specifying the index of a subtask.
+ */
+public class SubtaskIndexPathParameter extends MessagePathParameter<Integer> {
+
+	public static final String KEY = "subtaskindex";
+
+	public SubtaskIndexPathParameter() {
+		super(KEY);
+	}
+
+	@Override
+	protected Integer convertFromString(final String value) throws ConversionException {
+		final int subtaskIndex = Integer.parseInt(value);
+		if (subtaskIndex >= 0) {
+			return subtaskIndex;
+		} else {
+			throw new ConversionException("subtaskindex must be positive, was: " + subtaskIndex);
+		}
+	}
+
+	@Override
+	protected String convertToString(final Integer value) {
+		return value.toString();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeaders.java
new file mode 100644
index 0000000..5471020
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeaders.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+/**
+ * {@link MessageHeaders} for {@link SubtaskMetricsHandler}.
+ */
+public final class SubtaskMetricsHeaders extends
+	AbstractMetricsHeaders<SubtaskMetricsMessageParameters> {
+
+	private static final SubtaskMetricsHeaders INSTANCE = new SubtaskMetricsHeaders();
+
+	private SubtaskMetricsHeaders() {
+	}
+
+	@Override
+	public SubtaskMetricsMessageParameters getUnresolvedMessageParameters() {
+		return new SubtaskMetricsMessageParameters();
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return "/jobs/:" + JobIDPathParameter.KEY + "/vertices/:" + JobVertexIdPathParameter.KEY +
+			"/subtasks/:" + SubtaskIndexPathParameter.KEY + "/metrics";
+	}
+
+	public static SubtaskMetricsHeaders getInstance() {
+		return INSTANCE;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java
new file mode 100644
index 0000000..bdfa003
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * {@link MessageParameters} for {@link SubtaskMetricsHandler}.
+ */
+public class SubtaskMetricsMessageParameters extends MessageParameters {
+
+	private final JobIDPathParameter jobIDPathParameter = new JobIDPathParameter();
+
+	private final JobVertexIdPathParameter jobVertexIdPathParameter = new JobVertexIdPathParameter();
+
+	private final SubtaskIndexPathParameter subtaskIndexPathParameter = new SubtaskIndexPathParameter();
+
+	private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter();
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Collections.unmodifiableCollection(Arrays.asList(
+			jobIDPathParameter,
+			jobVertexIdPathParameter,
+			subtaskIndexPathParameter
+		));
+	}
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.singletonList(metricsFilterParameter);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
new file mode 100644
index 0000000..e38ff8a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Tests for {@link SubtaskMetricsHandler}.
+ */
+public class SubtaskMetricsHandlerTest extends MetricsHandlerTestBase<SubtaskMetricsHandler> {
+
+	private static final String TEST_JOB_ID = new JobID().toString();
+
+	private static final String TEST_VERTEX_ID = new JobVertexID().toString();
+
+	private static final int TEST_SUBTASK_INDEX = 0;
+
+	@Override
+	SubtaskMetricsHandler getMetricsHandler() {
+		return new SubtaskMetricsHandler(
+			CompletableFuture.completedFuture("localhost:12345"),
+			leaderRetriever,
+			TIMEOUT,
+			TEST_HEADERS,
+			mockMetricFetcher
+		);
+	}
+
+	@Override
+	QueryScopeInfo getQueryScopeInfo() {
+		return new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID, TEST_VERTEX_ID,
+			TEST_SUBTASK_INDEX);
+	}
+
+	@Override
+	Map<String, String> getPathParameters() {
+		final Map<String, String> pathParameters = new HashMap<>();
+		pathParameters.put(JobIDPathParameter.KEY, TEST_JOB_ID);
+		pathParameters.put(JobVertexIdPathParameter.KEY, TEST_VERTEX_ID);
+		pathParameters.put(SubtaskIndexPathParameter.KEY, Integer.toString(TEST_SUBTASK_INDEX));
+		return pathParameters;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameterTest.java
new file mode 100644
index 0000000..f018fd4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameterTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link SubtaskIndexPathParameter}.
+ */
+public class SubtaskIndexPathParameterTest {
+
+	private SubtaskIndexPathParameter subtaskIndexPathParameter;
+
+	@Before
+	public void setUp() {
+		subtaskIndexPathParameter = new SubtaskIndexPathParameter();
+	}
+
+	@Test
+	public void testConversionFromString() throws Exception {
+		assertThat(subtaskIndexPathParameter.convertFromString("2147483647"), equalTo(Integer.MAX_VALUE));
+	}
+
+	@Test
+	public void testConversionFromStringNegativeNumber() throws Exception {
+		try {
+			subtaskIndexPathParameter.convertFromString("-2147483648");
+			fail("Expected exception not thrown");
+		} catch (final ConversionException e) {
+			assertThat(e.getMessage(), equalTo("subtaskindex must be positive, was: " + Integer
+				.MIN_VALUE));
+		}
+	}
+
+	@Test
+	public void testConvertToString() throws Exception {
+		assertThat(subtaskIndexPathParameter.convertToString(Integer.MAX_VALUE), equalTo("2147483647"));
+	}
+
+	@Test
+	public void testIsMandatoryParameter() {
+		assertTrue(subtaskIndexPathParameter.isMandatory());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java
new file mode 100644
index 0000000..345ad74
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link SubtaskMetricsHeaders}.
+ */
+public class SubtaskMetricsHeadersTest {
+
+	private final SubtaskMetricsHeaders subtaskMetricsHeaders = SubtaskMetricsHeaders.getInstance();
+
+	@Test
+	public void testUrl() {
+		assertThat(subtaskMetricsHeaders.getTargetRestEndpointURL(),
+			equalTo("/jobs/:" + JobIDPathParameter.KEY + "/vertices/:" + JobVertexIdPathParameter.KEY +
+				"/subtasks/:" + SubtaskIndexPathParameter.KEY + "/metrics"));
+	}
+
+	@Test
+	public void testMessageParameters() {
+		assertThat(subtaskMetricsHeaders.getUnresolvedMessageParameters(),
+			instanceOf(SubtaskMetricsMessageParameters.class));
+	}
+}


[5/9] flink git commit: [FLIP-7716][flip6] Migrate JobManagerMetricsHandler to new RestServerEndpoint

Posted by tr...@apache.org.
[FLIP-7716][flip6] Migrate JobManagerMetricsHandler to new RestServerEndpoint

Migrate logic in
org.apache.flink.runtime.rest.handler.legacy.metrics.JobManagerMetricsHandler
to new handler and add new handler to DispatcherRestEndpoint.

[FLINK-7716][Javadoc] Deprecate method MetricStore#getJobManager().

There is a semantically equivalent method in MetricStore.

This closes #5083.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49a89960
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49a89960
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49a89960

Branch: refs/heads/master
Commit: 49a89960bc7010c6f5f9d28e7d7f26c94d517f16
Parents: cb85640
Author: gyao <ga...@data-artisans.com>
Authored: Tue Nov 28 16:07:48 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 14:52:54 2017 +0100

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      | 10 ++++
 .../job/metrics/JobManagerMetricsHandler.java   | 59 ++++++++++++++++++++
 .../handler/legacy/metrics/MetricStore.java     |  4 ++
 .../job/metrics/JobManagerMetricsHeaders.java   | 49 ++++++++++++++++
 .../JobManagerMetricsMessageParameters.java     | 46 +++++++++++++++
 .../metrics/JobManagerMetricsHandlerTest.java   | 51 +++++++++++++++++
 .../metrics/JobManagerMetricsHeadersTest.java   | 46 +++++++++++++++
 7 files changed, 265 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/49a89960/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index d132890..9916371 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatistic
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
@@ -69,6 +70,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDet
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
@@ -324,6 +326,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			responseHeaders,
 			metricFetcher);
 
+		final JobManagerMetricsHandler jobManagerMetricsHandler = new JobManagerMetricsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			metricFetcher);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -361,6 +370,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(SubtasksTimesHeaders.getInstance(), subtasksTimesHandler));
 		handlers.add(Tuple2.of(JobVertexMetricsHeaders.getInstance(), jobVertexMetricsHandler));
 		handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), taskManagerMetricsHandler));
+		handlers.add(Tuple2.of(JobManagerMetricsHeaders.getInstance(), jobManagerMetricsHandler));
 
 		optWebContent.ifPresent(
 			webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));

http://git-wip-us.apache.org/repos/asf/flink/blob/49a89960/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobManagerMetricsHandler.java
new file mode 100644
index 0000000..0d953c6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobManagerMetricsHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handler that returns JobManager metrics.
+ */
+public class JobManagerMetricsHandler extends AbstractMetricsHandler<JobManagerMetricsMessageParameters> {
+
+	public JobManagerMetricsHandler(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<DispatcherGateway> leaderRetriever,
+			final Time timeout,
+			final Map<String, String> headers,
+			final MetricFetcher metricFetcher) {
+		super(localRestAddress, leaderRetriever, timeout, headers, JobManagerMetricsHeaders.getInstance(),
+			metricFetcher);
+	}
+
+	@Nullable
+	@Override
+	protected MetricStore.ComponentMetricStore getComponentMetricStore(
+			final HandlerRequest<EmptyRequestBody, JobManagerMetricsMessageParameters> request,
+			final MetricStore metricStore) {
+		return metricStore.getJobManagerMetricStore();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/49a89960/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index 26025e0..8191052 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -162,6 +162,10 @@ public class MetricStore {
 		return unmodifiableMap(taskManagers);
 	}
 
+	/**
+	 * @deprecated Use semantically equivalent {@link #getJobManagerMetricStore()}.
+	 */
+	@Deprecated
 	public synchronized ComponentMetricStore getJobManager() {
 		return ComponentMetricStore.unmodifiable(jobManager);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/49a89960/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsHeaders.java
new file mode 100644
index 0000000..f819ff4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsHeaders.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+/**
+ * {@link MessageHeaders} for
+ * {@link org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler}.
+ */
+public final class JobManagerMetricsHeaders extends
+	AbstractMetricsHeaders<JobManagerMetricsMessageParameters> {
+
+	private static final JobManagerMetricsHeaders INSTANCE = new JobManagerMetricsHeaders();
+
+	private JobManagerMetricsHeaders() {
+	}
+
+	@Override
+	public JobManagerMetricsMessageParameters getUnresolvedMessageParameters() {
+		return new JobManagerMetricsMessageParameters();
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return "/jobmanager/metrics";
+	}
+
+	public static JobManagerMetricsHeaders getInstance() {
+		return INSTANCE;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/49a89960/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsMessageParameters.java
new file mode 100644
index 0000000..7fd6372
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsMessageParameters.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * {@link MessageParameters} for
+ * {@link org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler}.
+ */
+public class JobManagerMetricsMessageParameters extends MessageParameters {
+
+	private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter();
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.singleton(metricsFilterParameter);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/49a89960/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobManagerMetricsHandlerTest.java
new file mode 100644
index 0000000..f182364
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobManagerMetricsHandlerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Tests for {@link JobManagerMetricsHandler}.
+ */
+public class JobManagerMetricsHandlerTest extends MetricsHandlerTestBase<JobManagerMetricsHandler> {
+
+	@Override
+	JobManagerMetricsHandler getMetricsHandler() {
+		return new JobManagerMetricsHandler(
+			TEST_REST_ADDRESS,
+			leaderRetriever,
+			TIMEOUT,
+			TEST_HEADERS,
+			mockMetricFetcher);
+	}
+
+	@Override
+	QueryScopeInfo getQueryScopeInfo() {
+		return new QueryScopeInfo.JobManagerQueryScopeInfo();
+	}
+
+	@Override
+	Map<String, String> getPathParameters() {
+		return Collections.emptyMap();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/49a89960/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsHeadersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsHeadersTest.java
new file mode 100644
index 0000000..f6c9171
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsHeadersTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link JobManagerMetricsHeaders}.
+ */
+public class JobManagerMetricsHeadersTest {
+
+	private final JobManagerMetricsHeaders jobManagerMetricsHeaders =
+		JobManagerMetricsHeaders.getInstance();
+
+	@Test
+	public void testUrl() {
+		assertThat(jobManagerMetricsHeaders.getTargetRestEndpointURL(), equalTo("/jobmanager/metrics"));
+	}
+
+	@Test
+	public void testMessageParameters() {
+		assertThat(jobManagerMetricsHeaders.getUnresolvedMessageParameters(), instanceOf
+			(JobManagerMetricsMessageParameters.class));
+	}
+
+}


[3/9] flink git commit: [FLINK-8150] [flip6] Expose TaskExecutor's ResourceID as TaskExecutor id

Posted by tr...@apache.org.
[FLINK-8150] [flip6] Expose TaskExecutor's ResourceID as TaskExecutor id

Before, the TaskExecutor's InstanceID was exposed as TaskExecutor id. This was wrong
since the InstanceID is bound the registration of a TaskExecutor whereas the
ResourceID is bound to the lifetime of the TaskExecutor. Thus, it is better to identify
the TaskExecutor by its ResourceID which does not change.

This commit changes the behaviour accordingly on the ResourceManager and the
TaskManagerDetailsHandler.

This closes #5093.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc7ab135
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc7ab135
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc7ab135

Branch: refs/heads/master
Commit: dc7ab1355e5e84d50313eec50b930e09d6bd759f
Parents: 44d973d
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 28 12:43:39 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 14:52:53 2017 +0100

----------------------------------------------------------------------
 .../resourcemanager/ResourceManager.java        |  29 ++--
 .../resourcemanager/ResourceManagerGateway.java |   4 +-
 .../taskmanager/TaskManagerDetailsHandler.java  |   4 +-
 .../messages/json/InstanceIDDeserializer.java   |  45 ------
 .../messages/json/InstanceIDSerializer.java     |  44 ------
 .../messages/json/ResourceIDDeserializer.java   |  44 ++++++
 .../messages/json/ResourceIDSerializer.java     |  45 ++++++
 .../taskmanager/TaskManagerDetailsInfo.java     |  10 +-
 .../taskmanager/TaskManagerIdPathParameter.java |  13 +-
 .../messages/taskmanager/TaskManagerInfo.java   |  22 +--
 .../clusterframework/ResourceManagerTest.java   |  13 +-
 .../resourcemanager/ResourceManagerTest.java    | 153 +++++++++++++++++++
 .../resourcemanager/TestingResourceManager.java |  76 +++++++++
 .../utils/TestingResourceManagerGateway.java    |   2 +-
 .../taskmanager/TaskManagerInfoTest.java        |   4 +-
 .../TestingTaskExecutorGateway.java             | 124 +++++++++++++++
 16 files changed, 489 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3ba8808..4220ebf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -500,14 +500,14 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	@Override
 	public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Time timeout) {
 
-		ArrayList<TaskManagerInfo> taskManagerInfos = new ArrayList<>(taskExecutors.size());
+		final ArrayList<TaskManagerInfo> taskManagerInfos = new ArrayList<>(taskExecutors.size());
 
 		for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> taskExecutorEntry : taskExecutors.entrySet()) {
 			final WorkerRegistration<WorkerType> taskExecutor = taskExecutorEntry.getValue();
 
 			taskManagerInfos.add(
 				new TaskManagerInfo(
-					taskExecutor.getInstanceID(),
+					taskExecutorEntry.getKey(),
 					taskExecutor.getTaskExecutorGateway().getAddress(),
 					taskExecutor.getDataPort(),
 					taskManagerHeartbeatManager.getLastHeartbeatFrom(taskExecutorEntry.getKey()),
@@ -520,29 +520,20 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	}
 
 	@Override
-	public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(InstanceID instanceId, Time timeout) {
+	public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(ResourceID resourceId, Time timeout) {
 
-		Map.Entry<ResourceID, WorkerRegistration<WorkerType>> taskExecutorEntry = null;
+		WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(resourceId);
 
-		// TODO: Implement more efficiently
-		for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerEntry : taskExecutors.entrySet()) {
-			if (Objects.equals(workerEntry.getValue().getInstanceID(), instanceId)) {
-				taskExecutorEntry = workerEntry;
-				break;
-			}
-		}
-
-		if (taskExecutorEntry == null) {
-			return FutureUtils.completedExceptionally(new FlinkException("Requested TaskManager " + instanceId + " is not registered."));
+		if (taskExecutor == null) {
+			return FutureUtils.completedExceptionally(new FlinkException("Requested TaskManager " + resourceId + " is not registered."));
 		} else {
-			WorkerRegistration<?> taskExecutor = taskExecutorEntry.getValue();
 			final TaskManagerInfo taskManagerInfo = new TaskManagerInfo(
-				taskExecutor.getInstanceID(),
+				resourceId,
 				taskExecutor.getTaskExecutorGateway().getAddress(),
 				taskExecutor.getDataPort(),
-				taskManagerHeartbeatManager.getLastHeartbeatFrom(taskExecutorEntry.getKey()),
-				slotManager.getNumberRegisteredSlotsOf(instanceId),
-				slotManager.getNumberFreeSlotsOf(instanceId),
+				taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
+				slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
+				slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
 				taskExecutor.getHardwareDescription());
 
 			return CompletableFuture.completedFuture(taskManagerInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 88538ff..41a0794 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -185,11 +185,11 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
 	/**
 	 * Requests information about the given {@link TaskExecutor}.
 	 *
-	 * @param instanceId identifying the TaskExecutor for which to return information
+	 * @param taskManagerId identifying the TaskExecutor for which to return information
 	 * @param timeout of the request
 	 * @return Future TaskManager information
 	 */
-	CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(InstanceID instanceId, @RpcTimeout Time timeout);
+	CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(ResourceID taskManagerId, @RpcTimeout Time timeout);
 	 
 	/**
 	 * Requests the resource overview. The resource overview provides information about the

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
index bca2961..b8c1a60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.rest.handler.taskmanager;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -71,7 +71,7 @@ public class TaskManagerDetailsHandler<T extends RestfulGateway> extends Abstrac
 	protected CompletableFuture<TaskManagerDetailsInfo> handleRequest(
 			@Nonnull HandlerRequest<EmptyRequestBody, TaskManagerMessageParameters> request,
 			@Nonnull ResourceManagerGateway gateway) throws RestHandlerException {
-		final InstanceID taskManagerInstanceId = request.getPathParameter(TaskManagerIdPathParameter.class);
+		final ResourceID taskManagerInstanceId = request.getPathParameter(TaskManagerIdPathParameter.class);
 
 		CompletableFuture<TaskManagerInfo> taskManagerInfoFuture = gateway.requestTaskManagerInfo(taskManagerInstanceId, timeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDDeserializer.java
deleted file mode 100644
index 1c53dcd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDDeserializer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages.json;
-
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.util.StringUtils;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-
-import java.io.IOException;
-
-/**
- * Json deserializer for {@link InstanceID}.
- */
-public class InstanceIDDeserializer extends StdDeserializer<InstanceID> {
-
-	private static final long serialVersionUID = -9058463293913469849L;
-
-	protected InstanceIDDeserializer() {
-		super(InstanceID.class);
-	}
-
-	@Override
-	public InstanceID deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
-		return new InstanceID(StringUtils.hexStringToByte(p.getValueAsString()));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDSerializer.java
deleted file mode 100644
index f3c0dc5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDSerializer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages.json;
-
-import org.apache.flink.runtime.instance.InstanceID;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
-
-import java.io.IOException;
-
-/**
- * Json serializer for {@link InstanceID}.
- */
-public class InstanceIDSerializer extends StdSerializer<InstanceID> {
-
-	private static final long serialVersionUID = 5798852092159615938L;
-
-	protected InstanceIDSerializer() {
-		super(InstanceID.class);
-	}
-
-	@Override
-	public void serialize(InstanceID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
-		gen.writeString(value.toString());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java
new file mode 100644
index 0000000..cc0d086
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+/**
+ * Json deserializer for {@link ResourceID}.
+ */
+public class ResourceIDDeserializer extends StdDeserializer<ResourceID> {
+
+	private static final long serialVersionUID = -9058463293913469849L;
+
+	protected ResourceIDDeserializer() {
+		super(ResourceID.class);
+	}
+
+	@Override
+	public ResourceID deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+		return new ResourceID(p.getValueAsString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
new file mode 100644
index 0000000..fa95451
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * Json serializer for {@link InstanceID}.
+ */
+public class ResourceIDSerializer extends StdSerializer<ResourceID> {
+
+	private static final long serialVersionUID = 5798852092159615938L;
+
+	protected ResourceIDSerializer() {
+		super(ResourceID.class);
+	}
+
+	@Override
+	public void serialize(ResourceID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+		gen.writeString(value.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
index 3764a26..3b0a68f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.rest.messages.taskmanager;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.rest.messages.json.InstanceIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.ResourceIDDeserializer;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.util.Preconditions;
 
@@ -43,7 +43,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
 
 	@JsonCreator
 	public TaskManagerDetailsInfo(
-			@JsonDeserialize(using = InstanceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId,
+			@JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId,
 			@JsonProperty(FIELD_NAME_ADDRESS) String address,
 			@JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
 			@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
@@ -52,7 +52,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
 			@JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
 			@JsonProperty(FIELD_NAME_METRICS) TaskManagerMetricsInfo taskManagerMetrics) {
 		super(
-			instanceId,
+			resourceId,
 			address,
 			dataPort,
 			lastHeartbeat,
@@ -65,7 +65,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
 
 	public TaskManagerDetailsInfo(TaskManagerInfo taskManagerInfo, TaskManagerMetricsInfo taskManagerMetrics) {
 		this(
-			taskManagerInfo.getInstanceId(),
+			taskManagerInfo.getResourceId(),
 			taskManagerInfo.getAddress(),
 			taskManagerInfo.getDataPort(),
 			taskManagerInfo.getLastHeartbeat(),

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
index 2ff7909..b0cd2b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
@@ -18,15 +18,14 @@
 
 package org.apache.flink.runtime.rest.messages.taskmanager;
 
-import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.rest.messages.ConversionException;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
-import org.apache.flink.util.StringUtils;
 
 /**
  * TaskManager id path parameter used by TaskManager related handlers.
  */
-public class TaskManagerIdPathParameter extends MessagePathParameter<InstanceID> {
+public class TaskManagerIdPathParameter extends MessagePathParameter<ResourceID> {
 
 	public static final String KEY = "taskmanagerid";
 
@@ -35,12 +34,12 @@ public class TaskManagerIdPathParameter extends MessagePathParameter<InstanceID>
 	}
 
 	@Override
-	protected InstanceID convertFromString(String value) throws ConversionException {
-		return new InstanceID(StringUtils.hexStringToByte(value));
+	protected ResourceID convertFromString(String value) throws ConversionException {
+		return new ResourceID(value);
 	}
 
 	@Override
-	protected String convertToString(InstanceID value) {
-		return StringUtils.byteToHexString(value.getBytes());
+	protected String convertToString(ResourceID value) {
+		return value.getResourceIdString();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
index 1a07273..1ad7e7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.runtime.rest.messages.taskmanager;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.runtime.rest.messages.json.InstanceIDDeserializer;
-import org.apache.flink.runtime.rest.messages.json.InstanceIDSerializer;
+import org.apache.flink.runtime.rest.messages.json.ResourceIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.ResourceIDSerializer;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.util.Preconditions;
 
@@ -53,8 +53,8 @@ public class TaskManagerInfo implements ResponseBody {
 	public static final String FIELD_NAME_HARDWARE = "hardware";
 
 	@JsonProperty(FIELD_NAME_INSTANCE_ID)
-	@JsonSerialize(using = InstanceIDSerializer.class)
-	private final InstanceID instanceId;
+	@JsonSerialize(using = ResourceIDSerializer.class)
+	private final ResourceID resourceId;
 
 	@JsonProperty(FIELD_NAME_ADDRESS)
 	private final String address;
@@ -76,14 +76,14 @@ public class TaskManagerInfo implements ResponseBody {
 
 	@JsonCreator
 	public TaskManagerInfo(
-			@JsonDeserialize(using = InstanceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId,
+			@JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId,
 			@JsonProperty(FIELD_NAME_ADDRESS) String address,
 			@JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
 			@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
 			@JsonProperty(FIELD_NAME_NUMBER_SLOTS) int numberSlots,
 			@JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS) int numberAvailableSlots,
 			@JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription) {
-		this.instanceId = Preconditions.checkNotNull(instanceId);
+		this.resourceId = Preconditions.checkNotNull(resourceId);
 		this.address = Preconditions.checkNotNull(address);
 		this.dataPort = dataPort;
 		this.lastHeartbeat = lastHeartbeat;
@@ -92,8 +92,8 @@ public class TaskManagerInfo implements ResponseBody {
 		this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
 	}
 
-	public InstanceID getInstanceId() {
-		return instanceId;
+	public ResourceID getResourceId() {
+		return resourceId;
 	}
 
 	public String getAddress() {
@@ -133,7 +133,7 @@ public class TaskManagerInfo implements ResponseBody {
 			lastHeartbeat == that.lastHeartbeat &&
 			numberSlots == that.numberSlots &&
 			numberAvailableSlots == that.numberAvailableSlots &&
-			Objects.equals(instanceId, that.instanceId) &&
+			Objects.equals(resourceId, that.resourceId) &&
 			Objects.equals(address, that.address) &&
 			Objects.equals(hardwareDescription, that.hardwareDescription);
 	}
@@ -141,7 +141,7 @@ public class TaskManagerInfo implements ResponseBody {
 	@Override
 	public int hashCode() {
 		return Objects.hash(
-			instanceId,
+			resourceId,
 			address,
 			dataPort,
 			lastHeartbeat,

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 2b58b6f..c238e33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.clusterframework;
 
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
@@ -62,6 +60,9 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -70,8 +71,6 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
-import scala.Option;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -80,7 +79,11 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static org.junit.Assert.*;
+import scala.Option;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
new file mode 100644
index 0000000..3050718
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+public class ResourceManagerTest extends TestLogger {
+
+	private TestingRpcService rpcService;
+
+	@Before
+	public void setUp() {
+		if (rpcService != null) {
+			rpcService.stopService();
+			rpcService = null;
+		}
+
+		rpcService = new TestingRpcService();
+	}
+
+	@After
+	public void tearDown() {
+		if (rpcService != null) {
+			rpcService.stopService();
+			rpcService = null;
+		}
+	}
+
+	/**
+	 * Tests that we can retrieve the correct {@link TaskManagerInfo} from the {@link ResourceManager}.
+	 */
+	@Test
+	public void testRequestTaskManagerInfo() throws Exception {
+		final Configuration configuration = new Configuration();
+		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		final SlotManager slotManager = new SlotManager(
+			rpcService.getScheduledExecutor(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime());
+		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor(),
+			TestingUtils.infiniteTime());
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		final TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+		highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
+
+		final TestingResourceManager resourceManager = new TestingResourceManager(
+			rpcService,
+			ResourceManager.RESOURCE_MANAGER_NAME,
+			ResourceID.generate(),
+			resourceManagerConfiguration,
+			highAvailabilityServices,
+			new HeartbeatServices(1000L, 10000L),
+			slotManager,
+			new NoOpMetricRegistry(),
+			jobLeaderIdService,
+			testingFatalErrorHandler);
+
+		resourceManager.start();
+
+		try {
+			final ResourceID taskManagerId = ResourceID.generate();
+			final ResourceManagerGateway resourceManagerGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+			final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
+
+			// first make the ResourceManager the leader
+			resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+			rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
+
+			final HardwareDescription hardwareDescription = new HardwareDescription(
+				42,
+				1337L,
+				1337L,
+				0L);
+
+			final int dataPort = 1234;
+
+			CompletableFuture<RegistrationResponse> registrationResponseFuture = resourceManagerGateway.registerTaskExecutor(
+				taskExecutorGateway.getAddress(),
+				taskManagerId,
+				new SlotReport(),
+				dataPort,
+				hardwareDescription,
+				TestingUtils.TIMEOUT());
+
+			Assert.assertTrue(registrationResponseFuture.get() instanceof RegistrationResponse.Success);
+
+			CompletableFuture<TaskManagerInfo> taskManagerInfoFuture = resourceManagerGateway.requestTaskManagerInfo(
+				taskManagerId,
+				TestingUtils.TIMEOUT());
+
+			TaskManagerInfo taskManagerInfo = taskManagerInfoFuture.get();
+
+			Assert.assertEquals(taskManagerId, taskManagerInfo.getResourceId());
+			Assert.assertEquals(hardwareDescription, taskManagerInfo.getHardwareDescription());
+			Assert.assertEquals(taskExecutorGateway.getAddress(), taskManagerInfo.getAddress());
+			Assert.assertEquals(dataPort, taskManagerInfo.getDataPort());
+			Assert.assertEquals(0, taskManagerInfo.getNumberSlots());
+			Assert.assertEquals(0, taskManagerInfo.getNumberAvailableSlots());
+
+			testingFatalErrorHandler.rethrowError();
+		} finally {
+			RpcUtils.terminateRpcEndpoint(resourceManager, TestingUtils.TIMEOUT());
+			highAvailabilityServices.close();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
new file mode 100644
index 0000000..0d30822
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Simple {@link ResourceManager} implementation for testing purposes.
+ */
+public class TestingResourceManager extends ResourceManager<ResourceID> {
+
+	public TestingResourceManager(
+			RpcService rpcService,
+			String resourceManagerEndpointId,
+			ResourceID resourceId,
+			ResourceManagerConfiguration resourceManagerConfiguration,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			SlotManager slotManager,
+			MetricRegistry metricRegistry,
+			JobLeaderIdService jobLeaderIdService,
+			FatalErrorHandler fatalErrorHandler) {
+		super(rpcService, resourceManagerEndpointId, resourceId, resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, slotManager, metricRegistry, jobLeaderIdService, fatalErrorHandler);
+	}
+
+	@Override
+	protected void initialize() throws ResourceManagerException {
+		// noop
+	}
+
+	@Override
+	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) throws ResourceManagerException {
+		// noop
+	}
+
+	@Override
+	public void startNewWorker(ResourceProfile resourceProfile) {
+		// noop
+	}
+
+	@Override
+	protected ResourceID workerStarted(ResourceID resourceID) {
+		return resourceID;
+	}
+
+	@Override
+	public boolean stopWorker(ResourceID worker) {
+		// cannot stop workers
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index f11a1eb..0f8b5fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -200,7 +200,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 	}
 
 	@Override
-	public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(InstanceID instanceId, Time timeout) {
+	public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(ResourceID resourceId, Time timeout) {
 		return FutureUtils.completedExceptionally(new UnsupportedOperationException("Not yet implemented"));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
index 12514b7..9a1e85d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.rest.messages.taskmanager;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
 
 import java.util.Random;
@@ -44,7 +44,7 @@ public class TaskManagerInfoTest extends RestResponseMarshallingTestBase<TaskMan
 
 	static TaskManagerInfo createRandomTaskManagerInfo() {
 		return new TaskManagerInfo(
-			new InstanceID(),
+			ResourceID.generate(),
 			UUID.randomUUID().toString(),
 			random.nextInt(),
 			random.nextLong(),

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7ab135/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
new file mode 100644
index 0000000..a9b676e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Simple {@link TaskExecutorGateway} implementation for testing purposes.
+ */
+public class TestingTaskExecutorGateway implements TaskExecutorGateway {
+
+	private final String address;
+
+	private final String hostname;
+
+	public TestingTaskExecutorGateway() {
+		this("foobar:1234", "foobar");
+	}
+
+	public TestingTaskExecutorGateway(String address, String hostname) {
+		this.address = Preconditions.checkNotNull(address);
+		this.hostname = Preconditions.checkNotNull(hostname);
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> requestSlot(SlotID slotId, JobID jobId, AllocationID allocationId, String targetAddress, ResourceManagerId resourceManagerId, Time timeout) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	@Override
+	public void failPartition(ExecutionAttemptID executionAttemptID) {
+		// noop
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	@Override
+	public void heartbeatFromJobManager(ResourceID heartbeatOrigin) {
+		// noop
+	}
+
+	@Override
+	public void heartbeatFromResourceManager(ResourceID heartbeatOrigin) {
+		// noop
+	}
+
+	@Override
+	public void disconnectJobManager(JobID jobId, Exception cause) {
+		// nooop
+	}
+
+	@Override
+	public void disconnectResourceManager(Exception cause) {
+		// noop
+	}
+
+	@Override
+	public String getAddress() {
+		return address;
+	}
+
+	@Override
+	public String getHostname() {
+		return hostname;
+	}
+}


[9/9] flink git commit: [FLINK-7694][flip6] Migrate JobMetricsHandler to new RestServerEndpoint

Posted by tr...@apache.org.
[FLINK-7694][flip6] Migrate JobMetricsHandler to new RestServerEndpoint

Migrate logic in
org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetrisHandler to new
handler, and add new handler to DispatcherRestEndpoint.

[FLINK-7694][flip6] Use jobid path parameter constant in JobMetricsHandlerTest

This closes #5084.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4daf9223
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4daf9223
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4daf9223

Branch: refs/heads/master
Commit: 4daf9223a7b2ca8aec62fefd06b0152cbc2aa668
Parents: 79b5bd4
Author: gyao <ga...@data-artisans.com>
Authored: Tue Nov 28 16:42:13 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 14:52:55 2017 +0100

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      | 10 ++++
 .../handler/job/metrics/JobMetricsHandler.java  | 60 ++++++++++++++++++++
 .../messages/job/metrics/JobMetricsHeaders.java | 49 ++++++++++++++++
 .../metrics/JobMetricsMessageParameters.java    | 41 +++++++++++++
 .../job/metrics/JobMetricsHandlerTest.java      | 57 +++++++++++++++++++
 .../job/metrics/JobMetricsHeadersTest.java      | 48 ++++++++++++++++
 6 files changed, 265 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4daf9223/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 8a26a9fd..a99151a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCach
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
@@ -72,6 +73,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistic
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders;
@@ -321,6 +323,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			responseHeaders,
 			metricFetcher);
 
+		final JobMetricsHandler jobMetricsHandler = new JobMetricsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			metricFetcher);
+
 		final SubtaskMetricsHandler subtaskMetricsHandler = new SubtaskMetricsHandler(
 			restAddressFuture,
 			leaderRetriever,
@@ -378,6 +387,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(TaskManagerDetailsHeaders.getInstance(), taskManagerDetailsHandler));
 		handlers.add(Tuple2.of(SubtasksTimesHeaders.getInstance(), subtasksTimesHandler));
 		handlers.add(Tuple2.of(JobVertexMetricsHeaders.getInstance(), jobVertexMetricsHandler));
+		handlers.add(Tuple2.of(JobMetricsHeaders.getInstance(), jobMetricsHandler));
 		handlers.add(Tuple2.of(SubtaskMetricsHeaders.getInstance(), subtaskMetricsHandler));
 		handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), taskManagerMetricsHandler));
 		handlers.add(Tuple2.of(JobManagerMetricsHeaders.getInstance(), jobManagerMetricsHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/4daf9223/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobMetricsHandler.java
new file mode 100644
index 0000000..6a6c197
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobMetricsHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Request handler that returns for a given job a list of all available metrics or the values for a set of metrics.
+ */
+public class JobMetricsHandler extends AbstractMetricsHandler<JobMetricsMessageParameters> {
+
+	public JobMetricsHandler(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<DispatcherGateway> leaderRetriever,
+			final Time timeout,
+			final Map<String, String> headers,
+			final MetricFetcher metricFetcher) {
+		super(localRestAddress, leaderRetriever, timeout, headers, JobMetricsHeaders.getInstance(),
+			metricFetcher);
+	}
+
+	@Nullable
+	@Override
+	protected MetricStore.ComponentMetricStore getComponentMetricStore(
+		final HandlerRequest<EmptyRequestBody, JobMetricsMessageParameters> request,
+		final MetricStore metricStore) {
+		return metricStore.getJobMetricStore(request.getPathParameter(JobIDPathParameter.class).toString());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4daf9223/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsHeaders.java
new file mode 100644
index 0000000..393a81e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsHeaders.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+/**
+ * {@link MessageHeaders} for {@link JobMetricsHandler}.
+ */
+public final class JobMetricsHeaders extends AbstractMetricsHeaders<JobMetricsMessageParameters> {
+
+	private static final JobMetricsHeaders INSTANCE = new JobMetricsHeaders();
+
+	private JobMetricsHeaders() {
+	}
+
+	@Override
+	public JobMetricsMessageParameters getUnresolvedMessageParameters() {
+		return new JobMetricsMessageParameters();
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return "/jobs/:" + JobIDPathParameter.KEY + "/metrics";
+	}
+
+	public static JobMetricsHeaders getInstance() {
+		return INSTANCE;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4daf9223/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsMessageParameters.java
new file mode 100644
index 0000000..f8bab83
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsMessageParameters.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * {@link MessageParameters} for {@link JobMetricsHandler}.
+ */
+public class JobMetricsMessageParameters extends JobMessageParameters {
+
+	private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter();
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.singletonList(metricsFilterParameter);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4daf9223/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobMetricsHandlerTest.java
new file mode 100644
index 0000000..3921736
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobMetricsHandlerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for {@link JobMetricsHandler}.
+ */
+public class JobMetricsHandlerTest extends MetricsHandlerTestBase<JobMetricsHandler> {
+
+	private static final String TEST_JOB_ID = new JobID().toString();
+
+	@Override
+	JobMetricsHandler getMetricsHandler() {
+		return new JobMetricsHandler(
+			TEST_REST_ADDRESS,
+			leaderRetriever,
+			TIMEOUT,
+			TEST_HEADERS,
+			mockMetricFetcher);
+	}
+
+	@Override
+	QueryScopeInfo getQueryScopeInfo() {
+		return new QueryScopeInfo.JobQueryScopeInfo(TEST_JOB_ID);
+	}
+
+	@Override
+	Map<String, String> getPathParameters() {
+		Map<String, String> pathParameters = new HashMap<>();
+		pathParameters.put(JobIDPathParameter.KEY, TEST_JOB_ID);
+		return pathParameters;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4daf9223/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsHeadersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsHeadersTest.java
new file mode 100644
index 0000000..a623eba
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsHeadersTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link JobMetricsHeaders}.
+ */
+public class JobMetricsHeadersTest {
+
+	private final JobMetricsHeaders jobMetricsHeaders = JobMetricsHeaders.getInstance();
+
+	@Test
+	public void testUrl() {
+		assertThat(jobMetricsHeaders.getTargetRestEndpointURL(),
+			equalTo("/jobs/:" + JobIDPathParameter.KEY + "/metrics"));
+	}
+
+	@Test
+	public void testMessageParameters() {
+		assertThat(jobMetricsHeaders.getUnresolvedMessageParameters(), instanceOf
+			(JobMetricsMessageParameters.class));
+	}
+
+}


[7/9] flink git commit: [hotfix][Javadoc] Remove wrong Javadoc from SubtaskMetricsHandler

Posted by tr...@apache.org.
[hotfix][Javadoc] Remove wrong Javadoc from SubtaskMetricsHandler


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88232a66
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88232a66
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88232a66

Branch: refs/heads/master
Commit: 88232a66b96d01ecab907590ed9eb2c3a983924e
Parents: 97f6b63
Author: gyao <ga...@data-artisans.com>
Authored: Mon Nov 27 13:32:52 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 14:52:55 2017 +0100

----------------------------------------------------------------------
 .../runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/88232a66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java
index bc238e9..5e51c5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/SubtaskMetricsHandler.java
@@ -35,8 +35,6 @@ import java.util.concurrent.Executor;
  * The handler will then return a list containing the values of the requested metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
  *
- * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are
- * "sum", "max", "min" and "avg".
  */
 public class SubtaskMetricsHandler extends AbstractMetricsHandler {
 	private static final String SUBTASK_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/metrics";


[8/9] flink git commit: [hotfix][flip6] Add unit tests JobVertexMetricsHeadersTest

Posted by tr...@apache.org.
[hotfix][flip6] Add unit tests JobVertexMetricsHeadersTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79b5bd44
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79b5bd44
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79b5bd44

Branch: refs/heads/master
Commit: 79b5bd44eb57b2293232c940dcbd35036615b554
Parents: 88232a6
Author: gyao <ga...@data-artisans.com>
Authored: Mon Nov 27 13:34:02 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 14:52:55 2017 +0100

----------------------------------------------------------------------
 .../metrics/JobVertexMetricsHeadersTest.java    | 51 ++++++++++++++++++++
 1 file changed, 51 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/79b5bd44/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeadersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeadersTest.java
new file mode 100644
index 0000000..fd8283a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeadersTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link JobVertexMetricsHeaders}.
+ */
+public class JobVertexMetricsHeadersTest {
+
+	private final JobVertexMetricsHeaders jobVertexMetricsHeaders = JobVertexMetricsHeaders
+		.getInstance();
+
+	@Test
+	public void testUrl() {
+		assertThat(jobVertexMetricsHeaders.getTargetRestEndpointURL(),
+			equalTo("/jobs/:" + JobIDPathParameter.KEY + "/vertices/:" +
+				JobVertexIdPathParameter.KEY + "/metrics"));
+	}
+
+	@Test
+	public void testMessageParameters() {
+		assertThat(jobVertexMetricsHeaders.getUnresolvedMessageParameters(),
+			instanceOf(JobVertexMetricsMessageParameters.class));
+	}
+
+}