You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:03 UTC

[05/30] flink git commit: [hotfix] Move TaskManagersHandler to rest.handler.taskmanager and messages to rest.messages.taskmanager

[hotfix] Move TaskManagersHandler to rest.handler.taskmanager and messages to rest.messages.taskmanager

Move TaskManager messages to rest.messages.taskmanager

Move TaskManager message tests to rest.messages.taskmanager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f7f04a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f7f04a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f7f04a9

Branch: refs/heads/master
Commit: 3f7f04a995587f78185710ce8472d1b2e8a536f3
Parents: ac5f5b5
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Oct 17 15:03:41 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:42 2017 +0100

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      |   4 +-
 .../resourcemanager/ResourceManager.java        |   2 +-
 .../resourcemanager/ResourceManagerGateway.java |   2 +-
 .../rest/handler/TaskManagersHandler.java       |  71 -----------
 .../handler/legacy/TaskManagersHandler.java     |   2 +-
 .../taskmanager/TaskManagersHandler.java        |  74 +++++++++++
 .../runtime/rest/messages/TaskManagerInfo.java  | 123 ------------------
 .../rest/messages/TaskManagersHeaders.java      |  70 -----------
 .../runtime/rest/messages/TaskManagersInfo.java |  65 ----------
 .../messages/taskmanager/TaskManagerInfo.java   | 124 +++++++++++++++++++
 .../taskmanager/TaskManagersHeaders.java        |  73 +++++++++++
 .../messages/taskmanager/TaskManagersInfo.java  |  66 ++++++++++
 .../rest/messages/TaskManagerInfoTest.java      |  58 ---------
 .../rest/messages/TaskManagersInfoTest.java     |  42 -------
 .../taskmanager/TaskManagerInfoTest.java        |  60 +++++++++
 .../taskmanager/TaskManagersInfoTest.java       |  44 +++++++
 16 files changed, 446 insertions(+), 434 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 97f1f77..e2d411c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
-import org.apache.flink.runtime.rest.handler.TaskManagersHandler;
+import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
 import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
@@ -61,11 +61,11 @@ import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
-import org.apache.flink.runtime.rest.messages.TaskManagersHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.FileUtils;

http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 60ccb27..f24cdc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -52,7 +52,7 @@ import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
 import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
-import org.apache.flink.runtime.rest.messages.TaskManagerInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;

http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index a0cf877..c8eb012 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
-import org.apache.flink.runtime.rest.messages.TaskManagerInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;

http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/TaskManagersHandler.java
deleted file mode 100644
index 539c672..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/TaskManagersHandler.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.rest.messages.TaskManagersInfo;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-import javax.annotation.Nonnull;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Returns an overview over all registered TaskManagers of the cluster.
- */
-public class TaskManagersHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
-
-	private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
-
-	public TaskManagersHandler(
-			CompletableFuture<String> localRestAddress,
-			GatewayRetriever<T> leaderRetriever,
-			Time timeout,
-			Map<String, String> responseHeaders,
-			MessageHeaders<EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> messageHeaders,
-			GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever) {
-		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
-
-		this.resourceManagerGatewayRetriever = Preconditions.checkNotNull(resourceManagerGatewayRetriever);
-	}
-
-	@Override
-	protected CompletableFuture<TaskManagersInfo> handleRequest(
-			@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
-			@Nonnull T gateway) throws RestHandlerException {
-		Optional<ResourceManagerGateway> resourceManagerGatewayOptional = resourceManagerGatewayRetriever.getNow();
-
-		ResourceManagerGateway resourceManagerGateway = resourceManagerGatewayOptional.orElseThrow(
-			() -> new RestHandlerException("Cannot connect to ResourceManager right now. Please try to refresh.", HttpResponseStatus.NOT_FOUND));
-
-		return resourceManagerGateway
-			.requestTaskManagerInfo(timeout)
-			.thenApply(TaskManagersInfo::new);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
index 7df0545..18a7fd9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
-import org.apache.flink.runtime.rest.messages.TaskManagerInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.util.FlinkException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;

http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java
new file mode 100644
index 0000000..484b4ec
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagersHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Returns an overview over all registered TaskManagers of the cluster.
+ */
+public class TaskManagersHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
+
+	private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
+
+	public TaskManagersHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<T> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> messageHeaders,
+			GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever) {
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+		this.resourceManagerGatewayRetriever = Preconditions.checkNotNull(resourceManagerGatewayRetriever);
+	}
+
+	@Override
+	protected CompletableFuture<TaskManagersInfo> handleRequest(
+			@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
+			@Nonnull T gateway) throws RestHandlerException {
+		Optional<ResourceManagerGateway> resourceManagerGatewayOptional = resourceManagerGatewayRetriever.getNow();
+
+		ResourceManagerGateway resourceManagerGateway = resourceManagerGatewayOptional.orElseThrow(
+			() -> new RestHandlerException("Cannot connect to ResourceManager right now. Please try to refresh.", HttpResponseStatus.NOT_FOUND));
+
+		return resourceManagerGateway
+			.requestTaskManagerInfo(timeout)
+			.thenApply(TaskManagersInfo::new);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerInfo.java
deleted file mode 100644
index 979033b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerInfo.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.rest.messages.json.InstanceIDDeserializer;
-import org.apache.flink.runtime.rest.messages.json.InstanceIDSerializer;
-import org.apache.flink.runtime.taskexecutor.TaskExecutor;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
-
-import java.util.Objects;
-
-/**
- * Base class containing information for a {@link TaskExecutor}.
- */
-public class TaskManagerInfo implements ResponseBody {
-
-	public static final String FIELD_NAME_INSTANCE_ID = "id";
-
-	public static final String FIELD_NAME_ADDRESS = "path";
-
-	public static final String FIELD_NAME_DATA_PORT = "dataPort";
-
-	public static final String FIELD_NAME_LAST_HEARTBEAT = "timeSinceLastHeartbeat";
-
-	public static final String FIELD_NAME_NUMBER_SLOTS = "slotsNumber";
-
-	public static final String FIELD_NAME_NUMBER_AVAILABLE_SLOTS = "freeSlots";
-
-	public static final String FIELD_NAME_HARDWARE = "hardware";
-
-	@JsonProperty(FIELD_NAME_INSTANCE_ID)
-	@JsonSerialize(using = InstanceIDSerializer.class)
-	private final InstanceID instanceId;
-
-	@JsonProperty(FIELD_NAME_ADDRESS)
-	private final String address;
-
-	@JsonProperty(FIELD_NAME_DATA_PORT)
-	private final int dataPort;
-
-	@JsonProperty(FIELD_NAME_LAST_HEARTBEAT)
-	private final long lastHeartbeat;
-
-	@JsonProperty(FIELD_NAME_NUMBER_SLOTS)
-	private final int numberSlots;
-
-	@JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS)
-	private final int numberAvailableSlots;
-
-	@JsonProperty(FIELD_NAME_HARDWARE)
-	private final HardwareDescription hardwareDescription;
-
-	@JsonCreator
-	public TaskManagerInfo(
-			@JsonDeserialize(using = InstanceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId,
-			@JsonProperty(FIELD_NAME_ADDRESS) String address,
-			@JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
-			@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
-			@JsonProperty(FIELD_NAME_NUMBER_SLOTS) int numberSlots,
-			@JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS) int numberAvailableSlots,
-			@JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription) {
-		this.instanceId = Preconditions.checkNotNull(instanceId);
-		this.address = Preconditions.checkNotNull(address);
-		this.dataPort = dataPort;
-		this.lastHeartbeat = lastHeartbeat;
-		this.numberSlots = numberSlots;
-		this.numberAvailableSlots = numberAvailableSlots;
-		this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-		TaskManagerInfo that = (TaskManagerInfo) o;
-		return dataPort == that.dataPort &&
-			lastHeartbeat == that.lastHeartbeat &&
-			numberSlots == that.numberSlots &&
-			numberAvailableSlots == that.numberAvailableSlots &&
-			Objects.equals(instanceId, that.instanceId) &&
-			Objects.equals(address, that.address) &&
-			Objects.equals(hardwareDescription, that.hardwareDescription);
-	}
-
-	@Override
-	public int hashCode() {
-		return Objects.hash(
-			instanceId,
-			address,
-			dataPort,
-			lastHeartbeat,
-			numberSlots,
-			numberAvailableSlots,
-			hardwareDescription);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersHeaders.java
deleted file mode 100644
index bc0f82e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersHeaders.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
-import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * Message headers for the {@link TaskManagersHandler}.
- */
-public class TaskManagersHeaders implements MessageHeaders<EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
-
-	private static final TaskManagersHeaders INSTANCE = new TaskManagersHeaders();
-
-	public static final String URL = "/taskmanagers";
-
-	private TaskManagersHeaders() {}
-
-	@Override
-	public Class<EmptyRequestBody> getRequestClass() {
-		return EmptyRequestBody.class;
-	}
-
-	@Override
-	public Class<TaskManagersInfo> getResponseClass() {
-		return TaskManagersInfo.class;
-	}
-
-	@Override
-	public HttpResponseStatus getResponseStatusCode() {
-		return HttpResponseStatus.OK;
-	}
-
-	@Override
-	public EmptyMessageParameters getUnresolvedMessageParameters() {
-		return EmptyMessageParameters.getInstance();
-	}
-
-	@Override
-	public HttpMethodWrapper getHttpMethod() {
-		return HttpMethodWrapper.GET;
-	}
-
-	@Override
-	public String getTargetRestEndpointURL() {
-		return URL;
-	}
-
-	public static TaskManagersHeaders getInstance() {
-		return INSTANCE;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersInfo.java
deleted file mode 100644
index 2149912..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersInfo.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.Collection;
-import java.util.Objects;
-
-/**
- * Class containing a collection of {@link TaskManagerInfo}.
- */
-public class TaskManagersInfo implements ResponseBody {
-
-	public static final String FIELD_NAME_TASK_MANAGERS = "taskmanagers";
-
-	@JsonProperty(FIELD_NAME_TASK_MANAGERS)
-	private final Collection<TaskManagerInfo> taskManagerInfos;
-
-	@JsonCreator
-	public TaskManagersInfo(
-			@JsonProperty(FIELD_NAME_TASK_MANAGERS) Collection<TaskManagerInfo> taskManagerInfos) {
-		this.taskManagerInfos = Preconditions.checkNotNull(taskManagerInfos);
-	}
-
-	public Collection<TaskManagerInfo> getTaskManagerInfos() {
-		return taskManagerInfos;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-		TaskManagersInfo that = (TaskManagersInfo) o;
-		return Objects.equals(taskManagerInfos, that.taskManagerInfos);
-	}
-
-	@Override
-	public int hashCode() {
-		return Objects.hash(taskManagerInfos);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
new file mode 100644
index 0000000..00c066e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.InstanceIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.InstanceIDSerializer;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.Objects;
+
+/**
+ * Base class containing information for a {@link TaskExecutor}.
+ */
+public class TaskManagerInfo implements ResponseBody {
+
+	public static final String FIELD_NAME_INSTANCE_ID = "id";
+
+	public static final String FIELD_NAME_ADDRESS = "path";
+
+	public static final String FIELD_NAME_DATA_PORT = "dataPort";
+
+	public static final String FIELD_NAME_LAST_HEARTBEAT = "timeSinceLastHeartbeat";
+
+	public static final String FIELD_NAME_NUMBER_SLOTS = "slotsNumber";
+
+	public static final String FIELD_NAME_NUMBER_AVAILABLE_SLOTS = "freeSlots";
+
+	public static final String FIELD_NAME_HARDWARE = "hardware";
+
+	@JsonProperty(FIELD_NAME_INSTANCE_ID)
+	@JsonSerialize(using = InstanceIDSerializer.class)
+	private final InstanceID instanceId;
+
+	@JsonProperty(FIELD_NAME_ADDRESS)
+	private final String address;
+
+	@JsonProperty(FIELD_NAME_DATA_PORT)
+	private final int dataPort;
+
+	@JsonProperty(FIELD_NAME_LAST_HEARTBEAT)
+	private final long lastHeartbeat;
+
+	@JsonProperty(FIELD_NAME_NUMBER_SLOTS)
+	private final int numberSlots;
+
+	@JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS)
+	private final int numberAvailableSlots;
+
+	@JsonProperty(FIELD_NAME_HARDWARE)
+	private final HardwareDescription hardwareDescription;
+
+	@JsonCreator
+	public TaskManagerInfo(
+			@JsonDeserialize(using = InstanceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId,
+			@JsonProperty(FIELD_NAME_ADDRESS) String address,
+			@JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
+			@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
+			@JsonProperty(FIELD_NAME_NUMBER_SLOTS) int numberSlots,
+			@JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS) int numberAvailableSlots,
+			@JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription) {
+		this.instanceId = Preconditions.checkNotNull(instanceId);
+		this.address = Preconditions.checkNotNull(address);
+		this.dataPort = dataPort;
+		this.lastHeartbeat = lastHeartbeat;
+		this.numberSlots = numberSlots;
+		this.numberAvailableSlots = numberAvailableSlots;
+		this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		TaskManagerInfo that = (TaskManagerInfo) o;
+		return dataPort == that.dataPort &&
+			lastHeartbeat == that.lastHeartbeat &&
+			numberSlots == that.numberSlots &&
+			numberAvailableSlots == that.numberAvailableSlots &&
+			Objects.equals(instanceId, that.instanceId) &&
+			Objects.equals(address, that.address) &&
+			Objects.equals(hardwareDescription, that.hardwareDescription);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(
+			instanceId,
+			address,
+			dataPort,
+			lastHeartbeat,
+			numberSlots,
+			numberAvailableSlots,
+			hardwareDescription);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersHeaders.java
new file mode 100644
index 0000000..f1e9765
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersHeaders.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link TaskManagersHandler}.
+ */
+public class TaskManagersHeaders implements MessageHeaders<EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
+
+	private static final TaskManagersHeaders INSTANCE = new TaskManagersHeaders();
+
+	public static final String URL = "/taskmanagers";
+
+	private TaskManagersHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<TaskManagersInfo> getResponseClass() {
+		return TaskManagersInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public EmptyMessageParameters getUnresolvedMessageParameters() {
+		return EmptyMessageParameters.getInstance();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static TaskManagersHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersInfo.java
new file mode 100644
index 0000000..2e4b1a1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersInfo.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Class containing a collection of {@link TaskManagerInfo}.
+ */
+public class TaskManagersInfo implements ResponseBody {
+
+	public static final String FIELD_NAME_TASK_MANAGERS = "taskmanagers";
+
+	@JsonProperty(FIELD_NAME_TASK_MANAGERS)
+	private final Collection<TaskManagerInfo> taskManagerInfos;
+
+	@JsonCreator
+	public TaskManagersInfo(
+			@JsonProperty(FIELD_NAME_TASK_MANAGERS) Collection<TaskManagerInfo> taskManagerInfos) {
+		this.taskManagerInfos = Preconditions.checkNotNull(taskManagerInfos);
+	}
+
+	public Collection<TaskManagerInfo> getTaskManagerInfos() {
+		return taskManagerInfos;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		TaskManagersInfo that = (TaskManagersInfo) o;
+		return Objects.equals(taskManagerInfos, that.taskManagerInfos);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(taskManagerInfos);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagerInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagerInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagerInfoTest.java
deleted file mode 100644
index 371e818..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagerInfoTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.InstanceID;
-
-import java.util.Random;
-import java.util.UUID;
-
-/**
- * Test for (un)marshalling of the {@link TaskManagerInfo}.
- */
-public class TaskManagerInfoTest extends RestResponseMarshallingTestBase<TaskManagerInfo> {
-
-	private static final Random random = new Random();
-
-	@Override
-	protected Class<TaskManagerInfo> getTestResponseClass() {
-		return TaskManagerInfo.class;
-	}
-
-	@Override
-	protected TaskManagerInfo getTestResponseInstance() throws Exception {
-		return createRandomTaskManagerInfo();
-	}
-
-	static TaskManagerInfo createRandomTaskManagerInfo() {
-		return new TaskManagerInfo(
-			new InstanceID(),
-			UUID.randomUUID().toString(),
-			random.nextInt(),
-			random.nextLong(),
-			random.nextInt(),
-			random.nextInt(),
-			new HardwareDescription(
-				random.nextInt(),
-				random.nextLong(),
-				random.nextLong(),
-				random.nextLong()));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagersInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagersInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagersInfoTest.java
deleted file mode 100644
index 1f53674..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagersInfoTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import java.util.Arrays;
-
-import static org.apache.flink.runtime.rest.messages.TaskManagerInfoTest.createRandomTaskManagerInfo;
-
-/**
- * Test for (un)marshalling of {@link TaskManagersInfo}.
- */
-public class TaskManagersInfoTest extends RestResponseMarshallingTestBase<TaskManagersInfo> {
-
-	@Override
-	protected Class<TaskManagersInfo> getTestResponseClass() {
-		return TaskManagersInfo.class;
-	}
-
-	@Override
-	protected TaskManagersInfo getTestResponseInstance() throws Exception {
-		final TaskManagerInfo taskManagerInfo1 = createRandomTaskManagerInfo();
-		final TaskManagerInfo taskManagerInfo2 = createRandomTaskManagerInfo();
-
-		return new TaskManagersInfo(Arrays.asList(taskManagerInfo1, taskManagerInfo2));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
new file mode 100644
index 0000000..554e990
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
+
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Test for (un)marshalling of the {@link TaskManagerInfo}.
+ */
+public class TaskManagerInfoTest extends RestResponseMarshallingTestBase<TaskManagerInfo> {
+
+	private static final Random random = new Random();
+
+	@Override
+	protected Class<TaskManagerInfo> getTestResponseClass() {
+		return TaskManagerInfo.class;
+	}
+
+	@Override
+	protected TaskManagerInfo getTestResponseInstance() throws Exception {
+		return createRandomTaskManagerInfo();
+	}
+
+	static TaskManagerInfo createRandomTaskManagerInfo() {
+		return new TaskManagerInfo(
+			new InstanceID(),
+			UUID.randomUUID().toString(),
+			random.nextInt(),
+			random.nextLong(),
+			random.nextInt(),
+			random.nextInt(),
+			new HardwareDescription(
+				random.nextInt(),
+				random.nextLong(),
+				random.nextLong(),
+				random.nextLong()));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f7f04a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersInfoTest.java
new file mode 100644
index 0000000..5a8663b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersInfoTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.Arrays;
+
+import static org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfoTest.createRandomTaskManagerInfo;
+
+/**
+ * Test for (un)marshalling of {@link TaskManagersInfo}.
+ */
+public class TaskManagersInfoTest extends RestResponseMarshallingTestBase<TaskManagersInfo> {
+
+	@Override
+	protected Class<TaskManagersInfo> getTestResponseClass() {
+		return TaskManagersInfo.class;
+	}
+
+	@Override
+	protected TaskManagersInfo getTestResponseInstance() throws Exception {
+		final TaskManagerInfo taskManagerInfo1 = createRandomTaskManagerInfo();
+		final TaskManagerInfo taskManagerInfo2 = createRandomTaskManagerInfo();
+
+		return new TaskManagersInfo(Arrays.asList(taskManagerInfo1, taskManagerInfo2));
+	}
+}