You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:02 UTC
[04/30] flink git commit: [FLINK-7862] [flip6] Add
TaskManagerDetailsHandler
[FLINK-7862] [flip6] Add TaskManagerDetailsHandler
Pass MetricQueryServiceRetriever to DispatcherRestEndpoint
This closes #4862.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdf68442
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdf68442
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdf68442
Branch: refs/heads/master
Commit: fdf684427165e8068d4229a340b8e03548e022ef
Parents: 3f7f04a
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Oct 17 17:02:30 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:42 2017 +0100
----------------------------------------------------------------------
.../dispatcher/DispatcherRestEndpoint.java | 26 +-
.../entrypoint/SessionClusterEntrypoint.java | 23 +-
.../flink/runtime/metrics/MetricRegistry.java | 2 +
.../flink/runtime/metrics/dump/MetricDump.java | 9 +
.../runtime/metrics/dump/QueryScopeInfo.java | 8 +
.../resourcemanager/ResourceManager.java | 30 +++
.../resourcemanager/ResourceManagerGateway.java | 15 +-
.../taskmanager/AbstractTaskManagerHandler.java | 78 ++++++
.../taskmanager/TaskManagerDetailsHandler.java | 166 ++++++++++++
.../taskmanager/TaskManagersHandler.java | 30 +--
.../taskmanager/TaskManagerDetailsHeaders.java | 72 ++++++
.../taskmanager/TaskManagerDetailsInfo.java | 97 +++++++
.../taskmanager/TaskManagerIdPathParameter.java | 46 ++++
.../messages/taskmanager/TaskManagerInfo.java | 28 ++
.../TaskManagerMessageParameters.java | 45 ++++
.../taskmanager/TaskManagerMetricsInfo.java | 255 +++++++++++++++++++
.../taskmanager/TaskManagerDetailsInfoTest.java | 84 ++++++
.../taskmanager/TaskManagerInfoTest.java | 1 -
18 files changed, 985 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index e2d411c..12187e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
-import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
@@ -49,6 +48,9 @@ import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
+import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
@@ -65,9 +67,11 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeader
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
@@ -96,13 +100,16 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
private final ExecutionGraphCache executionGraphCache;
private final CheckpointStatsCache checkpointStatsCache;
+ private final MetricFetcher<DispatcherGateway> metricFetcher;
+
public DispatcherRestEndpoint(
RestServerEndpointConfiguration endpointConfiguration,
GatewayRetriever<DispatcherGateway> leaderRetriever,
Configuration clusterConfiguration,
RestHandlerConfiguration restConfiguration,
GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
- Executor executor) {
+ Executor executor,
+ MetricQueryServiceRetriever metricQueryServiceRetriever) {
super(endpointConfiguration);
this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
this.clusterConfiguration = Preconditions.checkNotNull(clusterConfiguration);
@@ -116,6 +123,12 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
this.checkpointStatsCache = new CheckpointStatsCache(
restConfiguration.getMaxCheckpointStatisticCacheEntries());
+
+ this.metricFetcher = new MetricFetcher<>(
+ leaderRetriever,
+ metricQueryServiceRetriever,
+ executor,
+ restConfiguration.getTimeout());
}
@Override
@@ -268,6 +281,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
TaskManagersHeaders.getInstance(),
resourceManagerRetriever);
+ TaskManagerDetailsHandler<DispatcherGateway> taskManagerDetailsHandler = new TaskManagerDetailsHandler<>(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ TaskManagerDetailsHeaders.getInstance(),
+ resourceManagerRetriever,
+ metricFetcher);
final File tmpDir = restConfiguration.getTmpDir();
Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -299,6 +320,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
+ handlers.add(Tuple2.of(TaskManagerDetailsHeaders.getInstance(), taskManagerDetailsHandler));
// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
optWebContent.ifPresent(
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 8a3cfc2..3feb005 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.entrypoint;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.Dispatcher;
@@ -38,11 +39,16 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
+import akka.actor.ActorSystem;
+
import java.util.Optional;
import java.util.concurrent.Executor;
@@ -92,11 +98,16 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
10,
Time.milliseconds(50L));
+ // TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint
+ final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
+ final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
dispatcherRestEndpoint = createDispatcherRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
- rpcService.getExecutor());
+ rpcService.getExecutor(),
+ new AkkaQueryServiceRetriever(actorSystem, timeout));
LOG.debug("Starting Dispatcher REST endpoint.");
dispatcherRestEndpoint.start();
@@ -179,15 +190,19 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
Configuration configuration,
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
- Executor executor) throws Exception {
+ Executor executor,
+ MetricQueryServiceRetriever metricQueryServiceRetriever) throws Exception {
+
+ final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
return new DispatcherRestEndpoint(
RestServerEndpointConfiguration.fromConfiguration(configuration),
dispatcherGatewayRetriever,
configuration,
- RestHandlerConfiguration.fromConfiguration(configuration),
+ restHandlerConfiguration,
resourceManagerGatewayRetriever,
- executor);
+ executor,
+ metricQueryServiceRetriever);
}
protected Dispatcher createDispatcher(
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 782d66a..e0c2667 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -27,6 +27,8 @@ import javax.annotation.Nullable;
/**
* Interface for a metric registry.
+
+ LOG.debug("Started MetricQueryService under {}.", metricQueryServicePath);
*/
public interface MetricRegistry {
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
index 202e453..c2d1eea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
@@ -47,6 +47,15 @@ public abstract class MetricDump {
*/
public abstract byte getCategory();
+ @Override
+ public String toString() {
+ return "MetricDump{" +
+ "scopeInfo=" + scopeInfo +
+ ", name='" + name + '\'' +
+ ", category='" + getCategory() + '\'' +
+ '}';
+ }
+
/**
* Container for the value of a {@link org.apache.flink.metrics.Counter}.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
index 9af9d78..e8a5a84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
@@ -51,6 +51,14 @@ public abstract class QueryScopeInfo {
*/
public abstract byte getCategory();
+ @Override
+ public String toString() {
+ return "QueryScopeInfo{" +
+ "scope='" + scope + '\'' +
+ ", category='" + getCategory() + '\'' +
+ '}';
+ }
+
protected String concatScopes(String additionalScope) {
return scope.isEmpty()
? additionalScope
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index f24cdc2..42fed29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -514,6 +514,36 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
@Override
+ public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(InstanceID instanceId, Time timeout) {
+
+ Map.Entry<ResourceID, WorkerRegistration<WorkerType>> taskExecutorEntry = null;
+
+ // TODO: Implement more efficiently
+ for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerEntry : taskExecutors.entrySet()) {
+ if (Objects.equals(workerEntry.getValue().getInstanceID(), instanceId)) {
+ taskExecutorEntry = workerEntry;
+ break;
+ }
+ }
+
+ if (taskExecutorEntry == null) {
+ return FutureUtils.completedExceptionally(new FlinkException("Requested TaskManager " + instanceId + " is not registered."));
+ } else {
+ WorkerRegistration<?> taskExecutor = taskExecutorEntry.getValue();
+ final TaskManagerInfo taskManagerInfo = new TaskManagerInfo(
+ taskExecutor.getInstanceID(),
+ taskExecutor.getTaskExecutorGateway().getAddress(),
+ taskExecutor.getDataPort(),
+ taskManagerHeartbeatManager.getLastHeartbeatFrom(taskExecutorEntry.getKey()),
+ slotManager.getNumberRegisteredSlotsOf(instanceId),
+ slotManager.getNumberFreeSlotsOf(instanceId),
+ taskExecutor.getHardwareDescription());
+
+ return CompletableFuture.completedFuture(taskManagerInfo);
+ }
+ }
+
+ @Override
public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
final int numberSlots = slotManager.getNumberRegisteredSlots();
final int numberFreeSlots = slotManager.getNumberFreeSlots();
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index c8eb012..7b95de7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -27,14 +27,14 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
-import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
@@ -170,10 +170,19 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
/**
* Requests information about the registered {@link TaskExecutor}.
*
- * @param timeout for the rpc.
+ * @param timeout of the request
* @return Future collection of TaskManager information
*/
CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(@RpcTimeout Time timeout);
+
+ /**
+ * Requests information about the given {@link TaskExecutor}.
+ *
+ * @param instanceId identifying the TaskExecutor for which to return information
+ * @param timeout of the request
+ * @return Future TaskManager information
+ */
+ CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(InstanceID instanceId, @RpcTimeout Time timeout);
/**
* Requests the resource overview. The resource overview provides information about the
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerHandler.java
new file mode 100644
index 0000000..c930bfc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Base class for TaskManager related REST handler which need access to the {@link ResourceManager}.
+ *
+ * @param <T> type of the {@link RestfulGateway}
+ * @param <R> request type
+ * @param <P> response type
+ * @param <M> message parameters type
+ */
+abstract class AbstractTaskManagerHandler<T extends RestfulGateway, R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<T, R, P, M> {
+
+ private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
+
+ protected AbstractTaskManagerHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends T> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<R, P, M> messageHeaders,
+ GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+ this.resourceManagerGatewayRetriever = Preconditions.checkNotNull(resourceManagerGatewayRetriever);
+ }
+
+ @Override
+ protected CompletableFuture<P> handleRequest(@Nonnull HandlerRequest<R, M> request, @Nonnull T gateway) throws RestHandlerException {
+ Optional<ResourceManagerGateway> resourceManagerGatewayOptional = resourceManagerGatewayRetriever.getNow();
+
+ ResourceManagerGateway resourceManagerGateway = resourceManagerGatewayOptional.orElseThrow(
+ () -> new RestHandlerException("Cannot connect to ResourceManager right now. Please try to refresh.", HttpResponseStatus.NOT_FOUND));
+
+ return handleRequest(request, resourceManagerGateway);
+ }
+
+ protected abstract CompletableFuture<P> handleRequest(@Nonnull HandlerRequest<R, M> request, @Nonnull ResourceManagerGateway gateway) throws RestHandlerException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
new file mode 100644
index 0000000..bca2961
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMetricsInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handler which serves detailed TaskManager information.
+ *
+ * @param <T> type of the owning {@link RestfulGateway}
+ */
+public class TaskManagerDetailsHandler<T extends RestfulGateway> extends AbstractTaskManagerHandler<T, EmptyRequestBody, TaskManagerDetailsInfo, TaskManagerMessageParameters> {
+
+ private final MetricFetcher metricFetcher;
+ private final MetricStore metricStore;
+
+ public TaskManagerDetailsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends T> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, TaskManagerDetailsInfo, TaskManagerMessageParameters> messageHeaders,
+ GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+ MetricFetcher metricFetcher) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, resourceManagerGatewayRetriever);
+
+ this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
+ this.metricStore = metricFetcher.getMetricStore();
+ }
+
+ @Override
+ protected CompletableFuture<TaskManagerDetailsInfo> handleRequest(
+ @Nonnull HandlerRequest<EmptyRequestBody, TaskManagerMessageParameters> request,
+ @Nonnull ResourceManagerGateway gateway) throws RestHandlerException {
+ final InstanceID taskManagerInstanceId = request.getPathParameter(TaskManagerIdPathParameter.class);
+
+ CompletableFuture<TaskManagerInfo> taskManagerInfoFuture = gateway.requestTaskManagerInfo(taskManagerInstanceId, timeout);
+
+ metricFetcher.update();
+
+ return taskManagerInfoFuture.thenApply(
+ (TaskManagerInfo taskManagerInfo) -> {
+ // the MetricStore is not yet thread safe, therefore we still have to synchronize it
+ synchronized (metricStore) {
+ final MetricStore.TaskManagerMetricStore tmMetrics = metricStore.getTaskManagerMetricStore(taskManagerInstanceId.toString());
+
+ final TaskManagerMetricsInfo taskManagerMetricsInfo;
+
+ if (tmMetrics != null) {
+ log.debug("Create metrics info for TaskManager {}.", taskManagerInstanceId);
+ taskManagerMetricsInfo = createTaskManagerMetricsInfo(tmMetrics);
+ } else {
+ log.debug("No metrics for TaskManager {}.", taskManagerInstanceId);
+ taskManagerMetricsInfo = TaskManagerMetricsInfo.empty();
+ }
+
+ return new TaskManagerDetailsInfo(
+ taskManagerInfo,
+ taskManagerMetricsInfo);
+ }
+ });
+ }
+
+ private static TaskManagerMetricsInfo createTaskManagerMetricsInfo(MetricStore.TaskManagerMetricStore tmMetrics) {
+
+ Preconditions.checkNotNull(tmMetrics);
+
+ long heapUsed = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+ long heapCommitted = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+ long heapTotal = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+ long nonHeapUsed = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+ long nonHeapCommitted = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+ long nonHeapTotal = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+ long directCount = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+ long directUsed = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+ long directMax = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+ long mappedCount = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+ long mappedUsed = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+ long mappedMax = Long.valueOf(tmMetrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+ long memorySegmentsAvailable = Long.valueOf(tmMetrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+ long memorySegmentsTotal = Long.valueOf(tmMetrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+ final List<TaskManagerMetricsInfo.GarbageCollectorInfo> garbageCollectorInfo = createGarbageCollectorInfo(tmMetrics);
+
+ return new TaskManagerMetricsInfo(
+ heapUsed,
+ heapCommitted,
+ heapTotal,
+ nonHeapUsed,
+ nonHeapCommitted,
+ nonHeapTotal,
+ directCount,
+ directUsed,
+ directMax,
+ mappedCount,
+ mappedUsed,
+ mappedMax,
+ memorySegmentsAvailable,
+ memorySegmentsTotal,
+ garbageCollectorInfo);
+ }
+
+ private static List<TaskManagerMetricsInfo.GarbageCollectorInfo> createGarbageCollectorInfo(MetricStore.TaskManagerMetricStore taskManagerMetricStore) {
+ Preconditions.checkNotNull(taskManagerMetricStore);
+
+ ArrayList<TaskManagerMetricsInfo.GarbageCollectorInfo> garbageCollectorInfos = new ArrayList<>(taskManagerMetricStore.garbageCollectorNames.size());
+
+ for (String garbageCollectorName: taskManagerMetricStore.garbageCollectorNames) {
+ final String count = taskManagerMetricStore.getMetric("Status.JVM.GarbageCollector." + garbageCollectorName + ".Count", null);
+ final String time = taskManagerMetricStore.getMetric("Status.JVM.GarbageCollector." + garbageCollectorName + ".Time", null);
+
+ if (count != null && time != null) {
+ garbageCollectorInfos.add(
+ new TaskManagerMetricsInfo.GarbageCollectorInfo(
+ garbageCollectorName,
+ Long.valueOf(count),
+ Long.valueOf(time)));
+ }
+ }
+
+ return garbageCollectorInfos;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java
index 484b4ec..c8a5c8d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rest.handler.taskmanager;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -29,22 +28,16 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import javax.annotation.Nonnull;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
* Returns an overview over all registered TaskManagers of the cluster.
*/
-public class TaskManagersHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
-
- private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
+public class TaskManagersHandler<T extends RestfulGateway> extends AbstractTaskManagerHandler<T, EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
public TaskManagersHandler(
CompletableFuture<String> localRestAddress,
@@ -53,21 +46,18 @@ public class TaskManagersHandler<T extends RestfulGateway> extends AbstractRestH
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> messageHeaders,
GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever) {
- super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
-
- this.resourceManagerGatewayRetriever = Preconditions.checkNotNull(resourceManagerGatewayRetriever);
+ super(
+ localRestAddress,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ messageHeaders,
+ resourceManagerGatewayRetriever);
}
@Override
- protected CompletableFuture<TaskManagersInfo> handleRequest(
- @Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
- @Nonnull T gateway) throws RestHandlerException {
- Optional<ResourceManagerGateway> resourceManagerGatewayOptional = resourceManagerGatewayRetriever.getNow();
-
- ResourceManagerGateway resourceManagerGateway = resourceManagerGatewayOptional.orElseThrow(
- () -> new RestHandlerException("Cannot connect to ResourceManager right now. Please try to refresh.", HttpResponseStatus.NOT_FOUND));
-
- return resourceManagerGateway
+ protected CompletableFuture<TaskManagersInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull ResourceManagerGateway gateway) throws RestHandlerException {
+ return gateway
.requestTaskManagerInfo(timeout)
.thenApply(TaskManagersInfo::new);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsHeaders.java
new file mode 100644
index 0000000..5ff4288
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsHeaders.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Headers for the {@link TaskManagerDetailsHandler} which serves the TaskManager details.
+ */
+public class TaskManagerDetailsHeaders implements MessageHeaders<EmptyRequestBody, TaskManagerDetailsInfo, TaskManagerMessageParameters> {
+
+ private static final TaskManagerDetailsHeaders INSTANCE = new TaskManagerDetailsHeaders();
+
+ public static final String URL = "/taskmanagers/:" + TaskManagerIdPathParameter.KEY;
+
+ private TaskManagerDetailsHeaders() {}
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<TaskManagerDetailsInfo> getResponseClass() {
+ return TaskManagerDetailsInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public TaskManagerMessageParameters getUnresolvedMessageParameters() {
+ return new TaskManagerMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static TaskManagerDetailsHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
new file mode 100644
index 0000000..3764a26
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rest.messages.json.InstanceIDDeserializer;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import java.util.Objects;
+
+/**
+ * Message containing base information about a {@link TaskExecutor} and more
+ * detailed metrics.
+ */
+public class TaskManagerDetailsInfo extends TaskManagerInfo {
+
+ public static final String FIELD_NAME_METRICS = "metrics";
+
+ @JsonProperty(FIELD_NAME_METRICS)
+ private final TaskManagerMetricsInfo taskManagerMetrics;
+
+ @JsonCreator
+ public TaskManagerDetailsInfo(
+ @JsonDeserialize(using = InstanceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId,
+ @JsonProperty(FIELD_NAME_ADDRESS) String address,
+ @JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
+ @JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
+ @JsonProperty(FIELD_NAME_NUMBER_SLOTS) int numberSlots,
+ @JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS) int numberAvailableSlots,
+ @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
+ @JsonProperty(FIELD_NAME_METRICS) TaskManagerMetricsInfo taskManagerMetrics) {
+ super(
+ instanceId,
+ address,
+ dataPort,
+ lastHeartbeat,
+ numberSlots,
+ numberAvailableSlots,
+ hardwareDescription);
+
+ this.taskManagerMetrics = Preconditions.checkNotNull(taskManagerMetrics);
+ }
+
+ public TaskManagerDetailsInfo(TaskManagerInfo taskManagerInfo, TaskManagerMetricsInfo taskManagerMetrics) {
+ this(
+ taskManagerInfo.getInstanceId(),
+ taskManagerInfo.getAddress(),
+ taskManagerInfo.getDataPort(),
+ taskManagerInfo.getLastHeartbeat(),
+ taskManagerInfo.getNumberSlots(),
+ taskManagerInfo.getNumberAvailableSlots(),
+ taskManagerInfo.getHardwareDescription(),
+ taskManagerMetrics);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ TaskManagerDetailsInfo that = (TaskManagerDetailsInfo) o;
+ return Objects.equals(taskManagerMetrics, that.taskManagerMetrics);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), taskManagerMetrics);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
new file mode 100644
index 0000000..2ff7909
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.util.StringUtils;
+
+/**
+ * TaskManager id path parameter used by TaskManager related handlers.
+ */
+public class TaskManagerIdPathParameter extends MessagePathParameter<InstanceID> {
+
+ public static final String KEY = "taskmanagerid";
+
+ protected TaskManagerIdPathParameter() {
+ super(KEY);
+ }
+
+ @Override
+ protected InstanceID convertFromString(String value) throws ConversionException {
+ return new InstanceID(StringUtils.hexStringToByte(value));
+ }
+
+ @Override
+ protected String convertToString(InstanceID value) {
+ return StringUtils.byteToHexString(value.getBytes());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
index 00c066e..1a07273 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
@@ -92,6 +92,34 @@ public class TaskManagerInfo implements ResponseBody {
this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
}
+ public InstanceID getInstanceId() {
+ return instanceId;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public int getDataPort() {
+ return dataPort;
+ }
+
+ public long getLastHeartbeat() {
+ return lastHeartbeat;
+ }
+
+ public int getNumberSlots() {
+ return numberSlots;
+ }
+
+ public int getNumberAvailableSlots() {
+ return numberAvailableSlots;
+ }
+
+ public HardwareDescription getHardwareDescription() {
+ return hardwareDescription;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java
new file mode 100644
index 0000000..59408a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Message parameter for a TaskManager specific handler. The message parameters
+ * define the TaskManager id as a path parameter.
+ */
+public class TaskManagerMessageParameters extends MessageParameters {
+
+ private TaskManagerIdPathParameter taskManagerIdParameter = new TaskManagerIdPathParameter();
+
+ @Override
+ public Collection<MessagePathParameter<?>> getPathParameters() {
+ return Collections.singleton(taskManagerIdParameter);
+ }
+
+ @Override
+ public Collection<MessageQueryParameter<?>> getQueryParameters() {
+ return Collections.emptySet();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMetricsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMetricsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMetricsInfo.java
new file mode 100644
index 0000000..c4a1f2a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMetricsInfo.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Contains information about the TaskManager metrics.
+ */
+public class TaskManagerMetricsInfo {
+
+ public static final String FIELD_NAME_HEAP_USED = "heapUsed";
+
+ public static final String FIELD_NAME_HEAP_COMMITTED = "heapCommitted";
+
+ public static final String FIELD_NAME_HEAP_MAX = "heapMax";
+
+ public static final String FIELD_NAME_NON_HEAP_USED = "nonHeapUsed";
+
+ public static final String FIELD_NAME_NON_HEAP_COMMITTED = "nonHeapCommitted";
+
+ public static final String FIELD_NAME_NON_HEAP_MAX = "nonHeapMax";
+
+ public static final String FIELD_NAME_DIRECT_COUNT = "directCount";
+
+ public static final String FIELD_NAME_DIRECT_USED = "directUsed";
+
+ public static final String FIELD_NAME_DIRECT_MAX = "directMax";
+
+ public static final String FIELD_NAME_MAPPED_COUNT = "mappedCount";
+
+ public static final String FIELD_NAME_MAPPED_USED = "mappedUsed";
+
+ public static final String FIELD_NAME_MAPPED_MAX = "mappedMax";
+
+ public static final String FIELD_NAME_NETWORK_MEMORY_SEGMENTS_AVAILABLE = "memorySegmentsAvailable";
+
+ public static final String FIELD_NAME_NETWORK_MEMROY_SEGMENTS_TOTAL = "memorySegmentsTotal";
+
+ public static final String FIELD_NAME_GARBAGE_COLLECTORS = "garbageCollectors";
+
+ // --------- Heap memory -------------
+
+ @JsonProperty(FIELD_NAME_HEAP_USED)
+ private final long heapUsed;
+
+ @JsonProperty(FIELD_NAME_HEAP_COMMITTED)
+ private final long heapCommitted;
+
+ @JsonProperty(FIELD_NAME_HEAP_MAX)
+ private final long heapMax;
+
+ // --------- Non heap memory -------------
+
+ @JsonProperty(FIELD_NAME_NON_HEAP_USED)
+ private final long nonHeapUsed;
+
+ @JsonProperty(FIELD_NAME_NON_HEAP_COMMITTED)
+ private final long nonHeapCommitted;
+
+ @JsonProperty(FIELD_NAME_NON_HEAP_MAX)
+ private final long nonHeapMax;
+
+ // --------- Direct buffer pool -------------
+
+ @JsonProperty(FIELD_NAME_DIRECT_COUNT)
+ private final long directCount;
+
+ @JsonProperty(FIELD_NAME_DIRECT_USED)
+ private final long directUsed;
+
+ @JsonProperty(FIELD_NAME_DIRECT_MAX)
+ private final long directMax;
+
+ // --------- Mapped buffer pool -------------
+
+ @JsonProperty(FIELD_NAME_MAPPED_COUNT)
+ private final long mappedCount;
+
+ @JsonProperty(FIELD_NAME_MAPPED_USED)
+ private final long mappedUsed;
+
+ @JsonProperty(FIELD_NAME_MAPPED_MAX)
+ private final long mappedMax;
+
+ // --------- Network buffer pool -------------
+
+ @JsonProperty(FIELD_NAME_NETWORK_MEMORY_SEGMENTS_AVAILABLE)
+ private final long memorySegmentsAvailable;
+
+ @JsonProperty(FIELD_NAME_NETWORK_MEMROY_SEGMENTS_TOTAL)
+ private final long memorySegmentsTotal;
+
+ // --------- Garbage collectors -------------
+
+ @JsonProperty(FIELD_NAME_GARBAGE_COLLECTORS)
+ private final List<GarbageCollectorInfo> garbageCollectorsInfo;
+
+ @JsonCreator
+ public TaskManagerMetricsInfo(
+ @JsonProperty(FIELD_NAME_HEAP_USED) long heapUsed,
+ @JsonProperty(FIELD_NAME_HEAP_COMMITTED) long heapCommitted,
+ @JsonProperty(FIELD_NAME_HEAP_MAX) long heapMax,
+ @JsonProperty(FIELD_NAME_NON_HEAP_USED) long nonHeapUsed,
+ @JsonProperty(FIELD_NAME_NON_HEAP_COMMITTED) long nonHeapCommitted,
+ @JsonProperty(FIELD_NAME_NON_HEAP_MAX) long nonHeapMax,
+ @JsonProperty(FIELD_NAME_DIRECT_COUNT) long directCount,
+ @JsonProperty(FIELD_NAME_DIRECT_USED) long directUsed,
+ @JsonProperty(FIELD_NAME_DIRECT_MAX) long directMax,
+ @JsonProperty(FIELD_NAME_MAPPED_COUNT) long mappedCount,
+ @JsonProperty(FIELD_NAME_MAPPED_USED) long mappedUsed,
+ @JsonProperty(FIELD_NAME_MAPPED_MAX) long mappedMax,
+ @JsonProperty(FIELD_NAME_NETWORK_MEMORY_SEGMENTS_AVAILABLE) long memorySegmentsAvailable,
+ @JsonProperty(FIELD_NAME_NETWORK_MEMROY_SEGMENTS_TOTAL) long memorySegmentsTotal,
+ @JsonProperty(FIELD_NAME_GARBAGE_COLLECTORS) List<GarbageCollectorInfo> garbageCollectorsInfo) {
+ this.heapUsed = heapUsed;
+ this.heapCommitted = heapCommitted;
+ this.heapMax = heapMax;
+ this.nonHeapUsed = nonHeapUsed;
+ this.nonHeapCommitted = nonHeapCommitted;
+ this.nonHeapMax = nonHeapMax;
+ this.directCount = directCount;
+ this.directUsed = directUsed;
+ this.directMax = directMax;
+ this.mappedCount = mappedCount;
+ this.mappedUsed = mappedUsed;
+ this.mappedMax = mappedMax;
+ this.memorySegmentsAvailable = memorySegmentsAvailable;
+ this.memorySegmentsTotal = memorySegmentsTotal;
+ this.garbageCollectorsInfo = Preconditions.checkNotNull(garbageCollectorsInfo);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TaskManagerMetricsInfo that = (TaskManagerMetricsInfo) o;
+ return heapUsed == that.heapUsed &&
+ heapCommitted == that.heapCommitted &&
+ heapMax == that.heapMax &&
+ nonHeapUsed == that.nonHeapUsed &&
+ nonHeapCommitted == that.nonHeapCommitted &&
+ nonHeapMax == that.nonHeapMax &&
+ directCount == that.directCount &&
+ directUsed == that.directUsed &&
+ directMax == that.directMax &&
+ mappedCount == that.mappedCount &&
+ mappedUsed == that.mappedUsed &&
+ mappedMax == that.mappedMax &&
+ memorySegmentsAvailable == that.memorySegmentsAvailable &&
+ memorySegmentsTotal == that.memorySegmentsTotal &&
+ Objects.equals(garbageCollectorsInfo, that.garbageCollectorsInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(heapUsed, heapCommitted, heapMax, nonHeapUsed, nonHeapCommitted, nonHeapMax, directCount, directUsed, directMax, mappedCount, mappedUsed, mappedMax, memorySegmentsAvailable, memorySegmentsTotal, garbageCollectorsInfo);
+ }
+
+ /**
+ * Information about the garbage collector metrics.
+ */
+ public static class GarbageCollectorInfo {
+
+ public static final String FIELD_NAME_NAME = "name";
+
+ public static final String FIELD_NAME_COUNT = "count";
+
+ public static final String FIELD_NAME_TIME = "time";
+
+ @JsonProperty(FIELD_NAME_NAME)
+ private final String name;
+
+ @JsonProperty(FIELD_NAME_COUNT)
+ private final long count;
+
+ @JsonProperty(FIELD_NAME_TIME)
+ private final long time;
+
+ @JsonCreator
+ public GarbageCollectorInfo(
+ @JsonProperty(FIELD_NAME_NAME) String name,
+ @JsonProperty(FIELD_NAME_COUNT) long count,
+ @JsonProperty(FIELD_NAME_TIME) long time) {
+ this.name = Preconditions.checkNotNull(name);
+ this.count = count;
+ this.time = time;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GarbageCollectorInfo that = (GarbageCollectorInfo) o;
+ return count == that.count &&
+ time == that.time &&
+ Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, count, time);
+ }
+ }
+
+ public static TaskManagerMetricsInfo empty() {
+ return new TaskManagerMetricsInfo(
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ Collections.emptyList());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java
new file mode 100644
index 0000000..0c85e42
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfoTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Tests (un)marshalling of {@link TaskManagerDetailsInfo}.
+ */
+public class TaskManagerDetailsInfoTest extends RestResponseMarshallingTestBase<TaskManagerDetailsInfo> {
+
+ private static final Random random = new Random();
+
+ @Override
+ protected Class<TaskManagerDetailsInfo> getTestResponseClass() {
+ return TaskManagerDetailsInfo.class;
+ }
+
+ @Override
+ protected TaskManagerDetailsInfo getTestResponseInstance() throws Exception {
+ final TaskManagerInfo taskManagerInfo = TaskManagerInfoTest.createRandomTaskManagerInfo();
+ final TaskManagerMetricsInfo taskManagerMetricsInfo = createRandomTaskManagerMetricsInfo();
+
+ return new TaskManagerDetailsInfo(
+ taskManagerInfo,
+ taskManagerMetricsInfo);
+ }
+
+ static TaskManagerMetricsInfo createRandomTaskManagerMetricsInfo() {
+ final List<TaskManagerMetricsInfo.GarbageCollectorInfo> garbageCollectorsInfo = createRandomGarbageCollectorsInfo();
+
+ return new TaskManagerMetricsInfo(
+ random.nextLong(),
+ random.nextLong(),
+ random.nextLong(),
+ random.nextLong(),
+ random.nextLong(),
+ random.nextLong(),
+ random.nextLong(),
+ random.nextLong(),
+ random.nextLong(),
+ random.nextLong(),
+ random.nextLong(),
+ random.nextLong(),
+ random.nextLong(),
+ random.nextLong(),
+ garbageCollectorsInfo);
+ }
+
+ static List<TaskManagerMetricsInfo.GarbageCollectorInfo> createRandomGarbageCollectorsInfo() {
+ final int numberGCs = random.nextInt(10);
+ final List<TaskManagerMetricsInfo.GarbageCollectorInfo> garbageCollectorInfos = new ArrayList<>(numberGCs);
+
+ for (int i = 0; i < numberGCs; i++) {
+ garbageCollectorInfos.add(new TaskManagerMetricsInfo.GarbageCollectorInfo(
+ UUID.randomUUID().toString(),
+ random.nextLong(),
+ random.nextLong()));
+ }
+
+ return garbageCollectorInfos;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf68442/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
index 554e990..12514b7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.rest.messages.taskmanager;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
-import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import java.util.Random;
import java.util.UUID;