You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:02 UTC

[04/30] flink git commit: [FLINK-7862] [flip6] Add TaskManagerDetailsHandler

[FLINK-7862] [flip6] Add TaskManagerDetailsHandler

Pass MetricQueryServiceRetriever to DispatcherRestEndpoint

This closes #4862.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdf68442
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdf68442
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdf68442

Branch: refs/heads/master
Commit: fdf684427165e8068d4229a340b8e03548e022ef
Parents: 3f7f04a
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Oct 17 17:02:30 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:42 2017 +0100

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      |  26 +-
 .../entrypoint/SessionClusterEntrypoint.java    |  23 +-
 .../flink/runtime/metrics/MetricRegistry.java   |   2 +
 .../flink/runtime/metrics/dump/MetricDump.java  |   9 +
 .../runtime/metrics/dump/QueryScopeInfo.java    |   8 +
 .../resourcemanager/ResourceManager.java        |  30 +++
 .../resourcemanager/ResourceManagerGateway.java |  15 +-
 .../taskmanager/AbstractTaskManagerHandler.java |  78 ++++++
 .../taskmanager/TaskManagerDetailsHandler.java  | 166 ++++++++++++
 .../taskmanager/TaskManagersHandler.java        |  30 +--
 .../taskmanager/TaskManagerDetailsHeaders.java  |  72 ++++++
 .../taskmanager/TaskManagerDetailsInfo.java     |  97 +++++++
 .../taskmanager/TaskManagerIdPathParameter.java |  46 ++++
 .../messages/taskmanager/TaskManagerInfo.java   |  28 ++
 .../TaskManagerMessageParameters.java           |  45 ++++
 .../taskmanager/TaskManagerMetricsInfo.java     | 255 +++++++++++++++++++
 .../taskmanager/TaskManagerDetailsInfoTest.java |  84 ++++++
 .../taskmanager/TaskManagerInfoTest.java        |   1 -
 18 files changed, 985 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index e2d411c..12187e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
-import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
 import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
@@ -49,6 +48,9 @@ import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
+import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
 import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
@@ -65,9 +67,11 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeader
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -96,13 +100,16 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 	private final ExecutionGraphCache executionGraphCache;
 	private final CheckpointStatsCache checkpointStatsCache;
 
+	private final MetricFetcher<DispatcherGateway> metricFetcher;
+
 	public DispatcherRestEndpoint(
 			RestServerEndpointConfiguration endpointConfiguration,
 			GatewayRetriever<DispatcherGateway> leaderRetriever,
 			Configuration clusterConfiguration,
 			RestHandlerConfiguration restConfiguration,
 			GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
-			Executor executor) {
+			Executor executor,
+			MetricQueryServiceRetriever metricQueryServiceRetriever) {
 		super(endpointConfiguration);
 		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
 		this.clusterConfiguration = Preconditions.checkNotNull(clusterConfiguration);
@@ -116,6 +123,12 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 
 		this.checkpointStatsCache = new CheckpointStatsCache(
 			restConfiguration.getMaxCheckpointStatisticCacheEntries());
+
+		this.metricFetcher = new MetricFetcher<>(
+			leaderRetriever,
+			metricQueryServiceRetriever,
+			executor,
+			restConfiguration.getTimeout());
 	}
 
 	@Override
@@ -268,6 +281,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			TaskManagersHeaders.getInstance(),
 			resourceManagerRetriever);
 
+		TaskManagerDetailsHandler<DispatcherGateway> taskManagerDetailsHandler = new TaskManagerDetailsHandler<>(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			TaskManagerDetailsHeaders.getInstance(),
+			resourceManagerRetriever,
+			metricFetcher);
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -299,6 +320,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
 		handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
 		handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
+		handlers.add(Tuple2.of(TaskManagerDetailsHeaders.getInstance(), taskManagerDetailsHandler));
 
 		// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
 		optWebContent.ifPresent(

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 8a3cfc2..3feb005 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
@@ -38,11 +39,16 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
+import akka.actor.ActorSystem;
+
 import java.util.Optional;
 import java.util.concurrent.Executor;
 
@@ -92,11 +98,16 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			10,
 			Time.milliseconds(50L));
 
+		// TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint
+		final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
+		final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
 		dispatcherRestEndpoint = createDispatcherRestEndpoint(
 			configuration,
 			dispatcherGatewayRetriever,
 			resourceManagerGatewayRetriever,
-			rpcService.getExecutor());
+			rpcService.getExecutor(),
+			new AkkaQueryServiceRetriever(actorSystem, timeout));
 
 		LOG.debug("Starting Dispatcher REST endpoint.");
 		dispatcherRestEndpoint.start();
@@ -179,15 +190,19 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			Configuration configuration,
 			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
 			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
-			Executor executor) throws Exception {
+			Executor executor,
+			MetricQueryServiceRetriever metricQueryServiceRetriever) throws Exception {
+
+		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
 
 		return new DispatcherRestEndpoint(
 			RestServerEndpointConfiguration.fromConfiguration(configuration),
 			dispatcherGatewayRetriever,
 			configuration,
-			RestHandlerConfiguration.fromConfiguration(configuration),
+			restHandlerConfiguration,
 			resourceManagerGatewayRetriever,
-			executor);
+			executor,
+			metricQueryServiceRetriever);
 	}
 
 	protected Dispatcher createDispatcher(

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 782d66a..e0c2667 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -27,6 +27,8 @@ import javax.annotation.Nullable;
 
 /**
  * Interface for a metric registry.
+
+				LOG.debug("Started MetricQueryService under {}.", metricQueryServicePath);
  */
 public interface MetricRegistry {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
index 202e453..c2d1eea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
@@ -47,6 +47,15 @@ public abstract class MetricDump {
 	 */
 	public abstract byte getCategory();
 
+	@Override
+	public String toString() {
+		return "MetricDump{" +
+			"scopeInfo=" + scopeInfo +
+			", name='" + name + '\'' +
+			", category='" + getCategory() + '\'' +
+			'}';
+	}
+
 	/**
 	 * Container for the value of a {@link org.apache.flink.metrics.Counter}.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
index 9af9d78..e8a5a84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
@@ -51,6 +51,14 @@ public abstract class QueryScopeInfo {
      */
 	public abstract byte getCategory();
 
+	@Override
+	public String toString() {
+		return "QueryScopeInfo{" +
+			"scope='" + scope + '\'' +
+			", category='" + getCategory() + '\'' +
+			'}';
+	}
+
 	protected String concatScopes(String additionalScope) {
 		return scope.isEmpty()
 			? additionalScope

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index f24cdc2..42fed29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -514,6 +514,36 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	@Override
+	public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(InstanceID instanceId, Time timeout) {
+
+		Map.Entry<ResourceID, WorkerRegistration<WorkerType>> taskExecutorEntry = null;
+
+		// TODO: Implement more efficiently
+		for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerEntry : taskExecutors.entrySet()) {
+			if (Objects.equals(workerEntry.getValue().getInstanceID(), instanceId)) {
+				taskExecutorEntry = workerEntry;
+				break;
+			}
+		}
+
+		if (taskExecutorEntry == null) {
+			return FutureUtils.completedExceptionally(new FlinkException("Requested TaskManager " + instanceId + " is not registered."));
+		} else {
+			WorkerRegistration<?> taskExecutor = taskExecutorEntry.getValue();
+			final TaskManagerInfo taskManagerInfo = new TaskManagerInfo(
+				taskExecutor.getInstanceID(),
+				taskExecutor.getTaskExecutorGateway().getAddress(),
+				taskExecutor.getDataPort(),
+				taskManagerHeartbeatManager.getLastHeartbeatFrom(taskExecutorEntry.getKey()),
+				slotManager.getNumberRegisteredSlotsOf(instanceId),
+				slotManager.getNumberFreeSlotsOf(instanceId),
+				taskExecutor.getHardwareDescription());
+
+			return CompletableFuture.completedFuture(taskManagerInfo);
+		}
+	}
+
+	@Override
 	public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
 		final int numberSlots = slotManager.getNumberRegisteredSlots();
 		final int numberFreeSlots = slotManager.getNumberFreeSlots();

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index c8eb012..7b95de7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -27,14 +27,14 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
-import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 
@@ -170,10 +170,19 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
 	/**
 	 * Requests information about the registered {@link TaskExecutor}.
 	 *
-	 * @param timeout for the rpc.
+	 * @param timeout of the request
 	 * @return Future collection of TaskManager information
 	 */
 	CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(@RpcTimeout Time timeout);
+
+	/**
+	 * Requests information about the given {@link TaskExecutor}.
+	 *
+	 * @param instanceId identifying the TaskExecutor for which to return information
+	 * @param timeout of the request
+	 * @return Future TaskManager information
+	 */
+	CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(InstanceID instanceId, @RpcTimeout Time timeout);
 	 
 	/**
 	 * Requests the resource overview. The resource overview provides information about the

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerHandler.java
new file mode 100644
index 0000000..c930bfc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Base class for TaskManager related REST handler which need access to the {@link ResourceManager}.
+ *
+ * @param <T> type of the {@link RestfulGateway}
+ * @param <R> request type
+ * @param <P> response type
+ * @param <M> message parameters type
+ */
+abstract class AbstractTaskManagerHandler<T extends RestfulGateway, R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<T, R, P, M> {
+
+	private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
+
+	protected AbstractTaskManagerHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends T> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<R, P, M> messageHeaders,
+			GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever) {
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+		this.resourceManagerGatewayRetriever = Preconditions.checkNotNull(resourceManagerGatewayRetriever);
+	}
+
+	@Override
+	protected CompletableFuture<P> handleRequest(@Nonnull HandlerRequest<R, M> request, @Nonnull T gateway) throws RestHandlerException {
+		Optional<ResourceManagerGateway> resourceManagerGatewayOptional = resourceManagerGatewayRetriever.getNow();
+
+		ResourceManagerGateway resourceManagerGateway = resourceManagerGatewayOptional.orElseThrow(
+			() -> new RestHandlerException("Cannot connect to ResourceManager right now. Please try to refresh.", HttpResponseStatus.NOT_FOUND));
+
+		return handleRequest(request, resourceManagerGateway);
+	}
+
+	protected abstract CompletableFuture<P> handleRequest(@Nonnull HandlerRequest<R, M> request, @Nonnull ResourceManagerGateway gateway) throws RestHandlerException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
new file mode 100644
index 0000000..bca2961
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMetricsInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handler which serves detailed TaskManager information.
+ *
+ * @param <T> type of the owning {@link RestfulGateway}
+ */
+public class TaskManagerDetailsHandler<T extends RestfulGateway> extends AbstractTaskManagerHandler<T, EmptyRequestBody, TaskManagerDetailsInfo, TaskManagerMessageParameters> {
+
+	private final MetricFetcher metricFetcher;
+	private final MetricStore metricStore;
+
+	public TaskManagerDetailsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends T> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, TaskManagerDetailsInfo, TaskManagerMessageParameters> messageHeaders,
+			GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+			MetricFetcher metricFetcher) {
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, resourceManagerGatewayRetriever);
+
+		this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
+		this.metricStore = metricFetcher.getMetricStore();
+	}
+
+	@Override
+	protected CompletableFuture<TaskManagerDetailsInfo> handleRequest(
+			@Nonnull HandlerRequest<EmptyRequestBody, TaskManagerMessageParameters> request,
+			@Nonnull ResourceManagerGateway gateway) throws RestHandlerException {
+		final InstanceID taskManagerInstanceId = request.getPathParameter(TaskManagerIdPathParameter.class);
+
+		CompletableFuture<TaskManagerInfo> taskManagerInfoFuture = gateway.requestTaskManagerInfo(taskManagerInstanceId, timeout);
+
+		metricFetcher.update();
+
+		return taskManagerInfoFuture.thenApply(
+			(TaskManagerInfo taskManagerInfo) -> {
+				// the MetricStore is not yet thread safe, therefore we still have to synchronize it
+				synchronized (metricStore) {
+					final MetricStore.TaskManagerMetricStore tmMetrics = metricStore.getTaskManagerMetricStore(taskManagerInstanceId.toString());
+
+					final TaskManagerMetricsInfo taskManagerMetricsInfo;
+
+					if (tmMetrics != null) {
+						log.debug("Create metrics info for TaskManager {}.", taskManagerInstanceId);
+						taskManagerMetricsInfo = createTaskManagerMetricsInfo(tmMetrics);
+					} else {
+						log.debug("No metrics for TaskManager {}.", taskManagerInstanceId);
+						taskManagerMetricsInfo = TaskManagerMetricsInfo.empty();
+					}
+
+					return new TaskManagerDetailsInfo(
+						taskManagerInfo,
+						taskManagerMetricsInfo);
+				}
+			});
+	}
+
+	private static TaskManagerMetricsInfo createTaskManagerMetricsInfo(MetricStore.TaskManagerMetricStore tmMetrics) {
+
+		Preconditions.checkNotNull(tmMetrics);
+
+		long heapUsed = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+		long heapCommitted = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+		long heapTotal = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+		long nonHeapUsed = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+		long nonHeapCommitted = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+		long nonHeapTotal = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+		long directCount = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+		long directUsed = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+		long directMax = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+		long mappedCount = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+		long mappedUsed = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+		long mappedMax = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+		long memorySegmentsAvailable = Long.valueOf(tmMetrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+		long memorySegmentsTotal = Long.valueOf(tmMetrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+		final List<TaskManagerMetricsInfo.GarbageCollectorInfo> garbageCollectorInfo = createGarbageCollectorInfo(tmMetrics);
+
+		return new TaskManagerMetricsInfo(
+			heapUsed,
+			heapCommitted,
+			heapTotal,
+			nonHeapUsed,
+			nonHeapCommitted,
+			nonHeapTotal,
+			directCount,
+			directUsed,
+			directMax,
+			mappedCount,
+			mappedUsed,
+			mappedMax,
+			memorySegmentsAvailable,
+			memorySegmentsTotal,
+			garbageCollectorInfo);
+	}
+
+	private static List<TaskManagerMetricsInfo.GarbageCollectorInfo> createGarbageCollectorInfo(MetricStore.TaskManagerMetricStore taskManagerMetricStore) {
+		Preconditions.checkNotNull(taskManagerMetricStore);
+
+		ArrayList<TaskManagerMetricsInfo.GarbageCollectorInfo> garbageCollectorInfos = new ArrayList<>(taskManagerMetricStore.garbageCollectorNames.size());
+
+		for (String garbageCollectorName: taskManagerMetricStore.garbageCollectorNames) {
+			final String count = taskManagerMetricStore.getMetric("Status.JVM.GarbageCollector." + garbageCollectorName + ".Count", null);
+			final String time = taskManagerMetricStore.getMetric("Status.JVM.GarbageCollector." + garbageCollectorName + ".Time", null);
+
+			if (count != null && time != null) {
+				garbageCollectorInfos.add(
+					new TaskManagerMetricsInfo.GarbageCollectorInfo(
+						garbageCollectorName,
+						Long.valueOf(count),
+						Long.valueOf(time)));
+			}
+		}
+
+		return garbageCollectorInfos;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java
index 484b4ec..c8a5c8d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rest.handler.taskmanager;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -29,22 +28,16 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import javax.annotation.Nonnull;
 
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * Returns an overview over all registered TaskManagers of the cluster.
  */
-public class TaskManagersHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
-
-	private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
+public class TaskManagersHandler<T extends RestfulGateway> extends AbstractTaskManagerHandler<T, EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
 
 	public TaskManagersHandler(
 			CompletableFuture<String> localRestAddress,
@@ -53,21 +46,18 @@ public class TaskManagersHandler<T extends RestfulGateway> extends AbstractRestH
 			Map<String, String> responseHeaders,
 			MessageHeaders<EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> messageHeaders,
 			GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever) {
-		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
-
-		this.resourceManagerGatewayRetriever = Preconditions.checkNotNull(resourceManagerGatewayRetriever);
+		super(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			messageHeaders,
+			resourceManagerGatewayRetriever);
 	}
 
 	@Override
-	protected CompletableFuture<TaskManagersInfo> handleRequest(
-			@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
-			@Nonnull T gateway) throws RestHandlerException {
-		Optional<ResourceManagerGateway> resourceManagerGatewayOptional = resourceManagerGatewayRetriever.getNow();
-
-		ResourceManagerGateway resourceManagerGateway = resourceManagerGatewayOptional.orElseThrow(
-			() -> new RestHandlerException("Cannot connect to ResourceManager right now. Please try to refresh.", HttpResponseStatus.NOT_FOUND));
-
-		return resourceManagerGateway
+	protected CompletableFuture<TaskManagersInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull ResourceManagerGateway gateway) throws RestHandlerException {
+		return gateway
 			.requestTaskManagerInfo(timeout)
 			.thenApply(TaskManagersInfo::new);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsHeaders.java
new file mode 100644
index 0000000..5ff4288
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsHeaders.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Headers for the {@link TaskManagerDetailsHandler} which serves the TaskManager details.
+ */
+public class TaskManagerDetailsHeaders implements MessageHeaders<EmptyRequestBody, TaskManagerDetailsInfo, TaskManagerMessageParameters> {
+
+	private static final TaskManagerDetailsHeaders INSTANCE = new TaskManagerDetailsHeaders();
+
+	public static final String URL = "/taskmanagers/:" + TaskManagerIdPathParameter.KEY;
+
+	private TaskManagerDetailsHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<TaskManagerDetailsInfo> getResponseClass() {
+		return TaskManagerDetailsInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public TaskManagerMessageParameters getUnresolvedMessageParameters() {
+		return new TaskManagerMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static TaskManagerDetailsHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
new file mode 100644
index 0000000..3764a26
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rest.messages.json.InstanceIDDeserializer;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import java.util.Objects;
+
+/**
+ * Message containing base information about a {@link TaskExecutor} and more
+ * detailed metrics.
+ */
+public class TaskManagerDetailsInfo extends TaskManagerInfo {
+
+	public static final String FIELD_NAME_METRICS = "metrics";
+
+	@JsonProperty(FIELD_NAME_METRICS)
+	private final TaskManagerMetricsInfo taskManagerMetrics;
+
+	@JsonCreator
+	public TaskManagerDetailsInfo(
+			@JsonDeserialize(using = InstanceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId,
+			@JsonProperty(FIELD_NAME_ADDRESS) String address,
+			@JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
+			@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
+			@JsonProperty(FIELD_NAME_NUMBER_SLOTS) int numberSlots,
+			@JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS) int numberAvailableSlots,
+			@JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
+			@JsonProperty(FIELD_NAME_METRICS) TaskManagerMetricsInfo taskManagerMetrics) {
+		super(
+			instanceId,
+			address,
+			dataPort,
+			lastHeartbeat,
+			numberSlots,
+			numberAvailableSlots,
+			hardwareDescription);
+
+		this.taskManagerMetrics = Preconditions.checkNotNull(taskManagerMetrics);
+	}
+
+	public TaskManagerDetailsInfo(TaskManagerInfo taskManagerInfo, TaskManagerMetricsInfo taskManagerMetrics) {
+		this(
+			taskManagerInfo.getInstanceId(),
+			taskManagerInfo.getAddress(),
+			taskManagerInfo.getDataPort(),
+			taskManagerInfo.getLastHeartbeat(),
+			taskManagerInfo.getNumberSlots(),
+			taskManagerInfo.getNumberAvailableSlots(),
+			taskManagerInfo.getHardwareDescription(),
+			taskManagerMetrics);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		TaskManagerDetailsInfo that = (TaskManagerDetailsInfo) o;
+		return Objects.equals(taskManagerMetrics, that.taskManagerMetrics);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), taskManagerMetrics);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
new file mode 100644
index 0000000..2ff7909
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.util.StringUtils;
+
+/**
+ * TaskManager id path parameter used by TaskManager related handlers.
+ */
+public class TaskManagerIdPathParameter extends MessagePathParameter<InstanceID> {
+
+	public static final String KEY = "taskmanagerid";
+
+	protected TaskManagerIdPathParameter() {
+		super(KEY);
+	}
+
+	@Override
+	protected InstanceID convertFromString(String value) throws ConversionException {
+		return new InstanceID(StringUtils.hexStringToByte(value));
+	}
+
+	@Override
+	protected String convertToString(InstanceID value) {
+		return StringUtils.byteToHexString(value.getBytes());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
index 00c066e..1a07273 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
@@ -92,6 +92,34 @@ public class TaskManagerInfo implements ResponseBody {
 		this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
 	}
 
+	public InstanceID getInstanceId() {
+		return instanceId;
+	}
+
+	public String getAddress() {
+		return address;
+	}
+
+	public int getDataPort() {
+		return dataPort;
+	}
+
+	public long getLastHeartbeat() {
+		return lastHeartbeat;
+	}
+
+	public int getNumberSlots() {
+		return numberSlots;
+	}
+
+	public int getNumberAvailableSlots() {
+		return numberAvailableSlots;
+	}
+
+	public HardwareDescription getHardwareDescription() {
+		return hardwareDescription;
+	}
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java
new file mode 100644
index 0000000..59408a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Message parameter for a TaskManager specific handler. The message parameters
+ * define the TaskManager id as a path parameter.
+ */
+public class TaskManagerMessageParameters extends MessageParameters {
+
+	private TaskManagerIdPathParameter taskManagerIdParameter = new TaskManagerIdPathParameter();
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Collections.singleton(taskManagerIdParameter);
+	}
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.emptySet();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMetricsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMetricsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMetricsInfo.java
new file mode 100644
index 0000000..c4a1f2a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMetricsInfo.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Contains information about the TaskManager metrics.
+ */
+public class TaskManagerMetricsInfo {
+
+	public static final String FIELD_NAME_HEAP_USED = "heapUsed";
+
+	public static final String FIELD_NAME_HEAP_COMMITTED = "heapCommitted";
+
+	public static final String FIELD_NAME_HEAP_MAX = "heapMax";
+
+	public static final String FIELD_NAME_NON_HEAP_USED = "nonHeapUsed";
+
+	public static final String FIELD_NAME_NON_HEAP_COMMITTED = "nonHeapCommitted";
+
+	public static final String FIELD_NAME_NON_HEAP_MAX = "nonHeapMax";
+
+	public static final String FIELD_NAME_DIRECT_COUNT = "directCount";
+
+	public static final String FIELD_NAME_DIRECT_USED = "directUsed";
+
+	public static final String FIELD_NAME_DIRECT_MAX = "directMax";
+
+	public static final String FIELD_NAME_MAPPED_COUNT = "mappedCount";
+
+	public static final String FIELD_NAME_MAPPED_USED = "mappedUsed";
+
+	public static final String FIELD_NAME_MAPPED_MAX = "mappedMax";
+
+	public static final String FIELD_NAME_NETWORK_MEMORY_SEGMENTS_AVAILABLE = "memorySegmentsAvailable";
+
+	public static final String FIELD_NAME_NETWORK_MEMROY_SEGMENTS_TOTAL = "memorySegmentsTotal";
+
+	public static final String FIELD_NAME_GARBAGE_COLLECTORS = "garbageCollectors";
+
+	// --------- Heap memory -------------
+
+	@JsonProperty(FIELD_NAME_HEAP_USED)
+	private final long heapUsed;
+
+	@JsonProperty(FIELD_NAME_HEAP_COMMITTED)
+	private final long heapCommitted;
+
+	@JsonProperty(FIELD_NAME_HEAP_MAX)
+	private final long heapMax;
+
+	// --------- Non heap memory -------------
+
+	@JsonProperty(FIELD_NAME_NON_HEAP_USED)
+	private final long nonHeapUsed;
+
+	@JsonProperty(FIELD_NAME_NON_HEAP_COMMITTED)
+	private final long nonHeapCommitted;
+
+	@JsonProperty(FIELD_NAME_NON_HEAP_MAX)
+	private final long nonHeapMax;
+
+	// --------- Direct buffer pool -------------
+
+	@JsonProperty(FIELD_NAME_DIRECT_COUNT)
+	private final long directCount;
+
+	@JsonProperty(FIELD_NAME_DIRECT_USED)
+	private final long directUsed;
+
+	@JsonProperty(FIELD_NAME_DIRECT_MAX)
+	private final long directMax;
+
+	// --------- Mapped buffer pool -------------
+
+	@JsonProperty(FIELD_NAME_MAPPED_COUNT)
+	private final long mappedCount;
+
+	@JsonProperty(FIELD_NAME_MAPPED_USED)
+	private final long mappedUsed;
+
+	@JsonProperty(FIELD_NAME_MAPPED_MAX)
+	private final long mappedMax;
+
+	// --------- Network buffer pool -------------
+
+	@JsonProperty(FIELD_NAME_NETWORK_MEMORY_SEGMENTS_AVAILABLE)
+	private final long memorySegmentsAvailable;
+
+	@JsonProperty(FIELD_NAME_NETWORK_MEMROY_SEGMENTS_TOTAL)
+	private final long memorySegmentsTotal;
+
+	// --------- Garbage collectors -------------
+
+	@JsonProperty(FIELD_NAME_GARBAGE_COLLECTORS)
+	private final List<GarbageCollectorInfo> garbageCollectorsInfo;
+
+	@JsonCreator
+	public TaskManagerMetricsInfo(
+			@JsonProperty(FIELD_NAME_HEAP_USED) long heapUsed,
+			@JsonProperty(FIELD_NAME_HEAP_COMMITTED) long heapCommitted,
+			@JsonProperty(FIELD_NAME_HEAP_MAX) long heapMax,
+			@JsonProperty(FIELD_NAME_NON_HEAP_USED) long nonHeapUsed,
+			@JsonProperty(FIELD_NAME_NON_HEAP_COMMITTED) long nonHeapCommitted,
+			@JsonProperty(FIELD_NAME_NON_HEAP_MAX) long nonHeapMax,
+			@JsonProperty(FIELD_NAME_DIRECT_COUNT) long directCount,
+			@JsonProperty(FIELD_NAME_DIRECT_USED) long directUsed,
+			@JsonProperty(FIELD_NAME_DIRECT_MAX) long directMax,
+			@JsonProperty(FIELD_NAME_MAPPED_COUNT) long mappedCount,
+			@JsonProperty(FIELD_NAME_MAPPED_USED) long mappedUsed,
+			@JsonProperty(FIELD_NAME_MAPPED_MAX) long mappedMax,
+			@JsonProperty(FIELD_NAME_NETWORK_MEMORY_SEGMENTS_AVAILABLE) long memorySegmentsAvailable,
+			@JsonProperty(FIELD_NAME_NETWORK_MEMROY_SEGMENTS_TOTAL) long memorySegmentsTotal,
+			@JsonProperty(FIELD_NAME_GARBAGE_COLLECTORS) List<GarbageCollectorInfo> garbageCollectorsInfo) {
+		this.heapUsed = heapUsed;
+		this.heapCommitted = heapCommitted;
+		this.heapMax = heapMax;
+		this.nonHeapUsed = nonHeapUsed;
+		this.nonHeapCommitted = nonHeapCommitted;
+		this.nonHeapMax = nonHeapMax;
+		this.directCount = directCount;
+		this.directUsed = directUsed;
+		this.directMax = directMax;
+		this.mappedCount = mappedCount;
+		this.mappedUsed = mappedUsed;
+		this.mappedMax = mappedMax;
+		this.memorySegmentsAvailable = memorySegmentsAvailable;
+		this.memorySegmentsTotal = memorySegmentsTotal;
+		this.garbageCollectorsInfo = Preconditions.checkNotNull(garbageCollectorsInfo);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		TaskManagerMetricsInfo that = (TaskManagerMetricsInfo) o;
+		return heapUsed == that.heapUsed &&
+			heapCommitted == that.heapCommitted &&
+			heapMax == that.heapMax &&
+			nonHeapUsed == that.nonHeapUsed &&
+			nonHeapCommitted == that.nonHeapCommitted &&
+			nonHeapMax == that.nonHeapMax &&
+			directCount == that.directCount &&
+			directUsed == that.directUsed &&
+			directMax == that.directMax &&
+			mappedCount == that.mappedCount &&
+			mappedUsed == that.mappedUsed &&
+			mappedMax == that.mappedMax &&
+			memorySegmentsAvailable == that.memorySegmentsAvailable &&
+			memorySegmentsTotal == that.memorySegmentsTotal &&
+			Objects.equals(garbageCollectorsInfo, that.garbageCollectorsInfo);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(heapUsed, heapCommitted, heapMax, nonHeapUsed, nonHeapCommitted, nonHeapMax, directCount, directUsed, directMax, mappedCount, mappedUsed, mappedMax, memorySegmentsAvailable, memorySegmentsTotal, garbageCollectorsInfo);
+	}
+
+	/**
+	 * Information about the garbage collector metrics.
+	 */
+	public static class GarbageCollectorInfo {
+
+		public static final String FIELD_NAME_NAME = "name";
+
+		public static final String FIELD_NAME_COUNT = "count";
+
+		public static final String FIELD_NAME_TIME = "time";
+
+		@JsonProperty(FIELD_NAME_NAME)
+		private final String name;
+
+		@JsonProperty(FIELD_NAME_COUNT)
+		private final long count;
+
+		@JsonProperty(FIELD_NAME_TIME)
+		private final long time;
+
+		@JsonCreator
+		public GarbageCollectorInfo(
+				@JsonProperty(FIELD_NAME_NAME) String name,
+				@JsonProperty(FIELD_NAME_COUNT) long count,
+				@JsonProperty(FIELD_NAME_TIME) long time) {
+			this.name = Preconditions.checkNotNull(name);
+			this.count = count;
+			this.time = time;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			GarbageCollectorInfo that = (GarbageCollectorInfo) o;
+			return count == that.count &&
+				time == that.time &&
+				Objects.equals(name, that.name);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(name, count, time);
+		}
+	}
+
+	public static TaskManagerMetricsInfo empty() {
+		return new TaskManagerMetricsInfo(
+			0L,
+			0L,
+			0L,
+			0L,
+			0L,
+			0L,
+			0L,
+			0L,
+			0L,
+			0L,
+			0L,
+			0L,
+			0L,
+			0L,
+			Collections.emptyList());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java
new file mode 100644
index 0000000..0c85e42
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Tests (un)marshalling of {@link TaskManagerDetailsInfo}.
+ */
+public class TaskManagerDetailsInfoTest extends RestResponseMarshallingTestBase<TaskManagerDetailsInfo> {
+
+	private static final Random random = new Random();
+
+	@Override
+	protected Class<TaskManagerDetailsInfo> getTestResponseClass() {
+		return TaskManagerDetailsInfo.class;
+	}
+
+	@Override
+	protected TaskManagerDetailsInfo getTestResponseInstance() throws Exception {
+		final TaskManagerInfo taskManagerInfo = TaskManagerInfoTest.createRandomTaskManagerInfo();
+		final TaskManagerMetricsInfo taskManagerMetricsInfo = createRandomTaskManagerMetricsInfo();
+
+		return new TaskManagerDetailsInfo(
+			taskManagerInfo,
+			taskManagerMetricsInfo);
+	}
+
+	static TaskManagerMetricsInfo createRandomTaskManagerMetricsInfo() {
+		final List<TaskManagerMetricsInfo.GarbageCollectorInfo> garbageCollectorsInfo = createRandomGarbageCollectorsInfo();
+
+		return new TaskManagerMetricsInfo(
+			random.nextLong(),
+			random.nextLong(),
+			random.nextLong(),
+			random.nextLong(),
+			random.nextLong(),
+			random.nextLong(),
+			random.nextLong(),
+			random.nextLong(),
+			random.nextLong(),
+			random.nextLong(),
+			random.nextLong(),
+			random.nextLong(),
+			random.nextLong(),
+			random.nextLong(),
+			garbageCollectorsInfo);
+	}
+
+	static List<TaskManagerMetricsInfo.GarbageCollectorInfo> createRandomGarbageCollectorsInfo() {
+		final int numberGCs = random.nextInt(10);
+		final List<TaskManagerMetricsInfo.GarbageCollectorInfo> garbageCollectorInfos = new ArrayList<>(numberGCs);
+
+		for (int i = 0; i < numberGCs; i++) {
+			garbageCollectorInfos.add(new TaskManagerMetricsInfo.GarbageCollectorInfo(
+				UUID.randomUUID().toString(),
+				random.nextLong(),
+				random.nextLong()));
+		}
+
+		return garbageCollectorInfos;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
index 554e990..12514b7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.rest.messages.taskmanager;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
-import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 
 import java.util.Random;
 import java.util.UUID;