You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:04 UTC

[06/30] flink git commit: [FLINK-7648] [flip6] Add TaskManagersHandler

[FLINK-7648] [flip6] Add TaskManagersHandler

Send dataPort and HardwareDescription to RM

Instantiate RM leader retriever


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/def87816
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/def87816
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/def87816

Branch: refs/heads/master
Commit: def87816f376740902f0944a6aa5791a0a937e89
Parents: eddb5b0
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Oct 12 00:40:17 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:42 2017 +0100

----------------------------------------------------------------------
 .../MesosResourceManagerTest.java               |   5 +-
 .../dispatcher/DispatcherRestEndpoint.java      |  19 ++-
 .../entrypoint/SessionClusterEntrypoint.java    |  24 ++++
 .../runtime/instance/HardwareDescription.java   |  44 ++++++-
 .../resourcemanager/ResourceManager.java        |  44 ++++++-
 .../resourcemanager/ResourceManagerGateway.java |  14 +++
 .../registration/WorkerRegistration.java        |  21 +++-
 .../rest/handler/TaskManagersHandler.java       |  71 +++++++++++
 .../runtime/rest/messages/TaskManagerInfo.java  | 123 +++++++++++++++++++
 .../rest/messages/TaskManagersHeaders.java      |  70 +++++++++++
 .../runtime/rest/messages/TaskManagersInfo.java |  65 ++++++++++
 .../messages/json/InstanceIDDeserializer.java   |  45 +++++++
 .../messages/json/InstanceIDSerializer.java     |  44 +++++++
 .../runtime/taskexecutor/TaskExecutor.java      |   7 ++
 ...TaskExecutorToResourceManagerConnection.java |  31 ++++-
 .../clusterframework/ResourceManagerTest.java   |   5 +
 .../ResourceManagerTaskExecutorTest.java        |  13 +-
 .../rest/messages/TaskManagerInfoTest.java      |  58 +++++++++
 .../rest/messages/TaskManagersInfoTest.java     |  42 +++++++
 .../runtime/taskexecutor/TaskExecutorTest.java  |  35 ++++--
 20 files changed, 749 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index a45abe0..a0a8069 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
@@ -644,9 +645,11 @@ public class MesosResourceManagerTest extends TestLogger {
 			startResourceManager();
 			assertThat(resourceManager.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
 
+			final int dataPort = 1234;
+			final HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
 			// send registration message
 			CompletableFuture<RegistrationResponse> successfulFuture =
-				resourceManager.registerTaskExecutor(task1Executor.address, task1Executor.resourceID, slotReport, timeout);
+				resourceManager.registerTaskExecutor(task1Executor.address, task1Executor.resourceID, slotReport, dataPort, hardwareDescription, timeout);
 			RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 7f9c148..829f058 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -22,11 +22,13 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.TaskManagersHandler;
 import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
@@ -59,6 +61,7 @@ import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.TaskManagersHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
@@ -87,6 +90,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 	private final GatewayRetriever<DispatcherGateway> leaderRetriever;
 	private final Configuration clusterConfiguration;
 	private final RestHandlerConfiguration restConfiguration;
+	private final GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever;
 	private final Executor executor;
 
 	private final ExecutionGraphCache executionGraphCache;
@@ -97,11 +101,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			GatewayRetriever<DispatcherGateway> leaderRetriever,
 			Configuration clusterConfiguration,
 			RestHandlerConfiguration restConfiguration,
+			GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
 			Executor executor) {
 		super(endpointConfiguration);
 		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
 		this.clusterConfiguration = Preconditions.checkNotNull(clusterConfiguration);
 		this.restConfiguration = Preconditions.checkNotNull(restConfiguration);
+		this.resourceManagerRetriever = Preconditions.checkNotNull(resourceManagerRetriever);
 		this.executor = Preconditions.checkNotNull(executor);
 
 		this.executionGraphCache = new ExecutionGraphCache(
@@ -223,7 +229,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			executionGraphCache,
 			executor,
 			checkpointStatsCache);
-
+			
 		JobExceptionsHandler jobExceptionsHandler = new JobExceptionsHandler(
 			restAddressFuture,
 			leaderRetriever,
@@ -232,7 +238,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			JobExceptionsHeaders.getInstance(),
 			executionGraphCache,
 			executor);
-
+			
 		JobVertexAccumulatorsHandler jobVertexAccumulatorsHandler = new JobVertexAccumulatorsHandler(
 			restAddressFuture,
 			leaderRetriever,
@@ -254,6 +260,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			timeout,
 			responseHeaders);
 
+		TaskManagersHandler<DispatcherGateway> taskManagersHandler = new TaskManagersHandler<>(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			TaskManagersHeaders.getInstance(),
+			resourceManagerRetriever);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -284,6 +298,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
 		handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
 		handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
+		handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
 
 		// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
 		optWebContent.ifPresent(

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index e24e01a..8a3cfc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -56,6 +57,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 
 	private LeaderRetrievalService dispatcherLeaderRetrievalService;
 
+	private LeaderRetrievalService resourceManagerRetrievalService;
+
 	private DispatcherRestEndpoint dispatcherRestEndpoint;
 
 	public SessionClusterEntrypoint(Configuration configuration) {
@@ -73,6 +76,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 
 		dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
 
+		resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
+
 		LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
 			rpcService,
 			DispatcherGateway.class,
@@ -80,9 +85,17 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			10,
 			Time.milliseconds(50L));
 
+		LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
+			rpcService,
+			ResourceManagerGateway.class,
+			ResourceManagerId::new,
+			10,
+			Time.milliseconds(50L));
+
 		dispatcherRestEndpoint = createDispatcherRestEndpoint(
 			configuration,
 			dispatcherGatewayRetriever,
+			resourceManagerGatewayRetriever,
 			rpcService.getExecutor());
 
 		LOG.debug("Starting Dispatcher REST endpoint.");
@@ -110,6 +123,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 
 		LOG.debug("Starting ResourceManager.");
 		resourceManager.start();
+		resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
 
 		LOG.debug("Starting Dispatcher.");
 		dispatcher.start();
@@ -140,6 +154,14 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			}
 		}
 
+		if (resourceManagerRetrievalService != null) {
+			try {
+				resourceManagerRetrievalService.stop();
+			} catch (Throwable t) {
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
+			}
+		}
+
 		if (resourceManager != null) {
 			try {
 				resourceManager.shutDown();
@@ -156,6 +178,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 	protected DispatcherRestEndpoint createDispatcherRestEndpoint(
 			Configuration configuration,
 			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
+			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
 			Executor executor) throws Exception {
 
 		return new DispatcherRestEndpoint(
@@ -163,6 +186,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			dispatcherGatewayRetriever,
 			configuration,
 			RestHandlerConfiguration.fromConfiguration(configuration),
+			resourceManagerGatewayRetriever,
 			executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
index 9c1c5b7..30f7480 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
@@ -20,7 +20,11 @@ package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.util.Hardware;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * A hardware description describes the resources available to a task manager.
@@ -29,16 +33,28 @@ public final class HardwareDescription implements Serializable {
 
 	private static final long serialVersionUID = 3380016608300325361L;
 
+	public static final String FIELD_NAME_CPU_CORES = "cpuCores";
+
+	public static final String FIELD_NAME_SIZE_PHYSICAL_MEMORY = "physicalMemory";
+
+	public static final String FIELD_NAME_SIZE_JVM_HEAP = "freeMemory";
+
+	public static final String FIELD_NAME_SIZE_MANAGED_MEMORY = "managedMemory";
+
 	/** The number of CPU cores available to the JVM on the compute node. */
+	@JsonProperty(FIELD_NAME_CPU_CORES)
 	private final int numberOfCPUCores;
 
 	/** The size of physical memory in bytes available on the compute node. */
+	@JsonProperty(FIELD_NAME_SIZE_PHYSICAL_MEMORY)
 	private final long sizeOfPhysicalMemory;
 
 	/** The size of the JVM heap memory */
+	@JsonProperty(FIELD_NAME_SIZE_JVM_HEAP)
 	private final long sizeOfJvmHeap;
 
 	/** The size of the memory managed by the system for caching, hashing, sorting, ... */
+	@JsonProperty(FIELD_NAME_SIZE_MANAGED_MEMORY)
 	private final long sizeOfManagedMemory;
 
 	/**
@@ -49,7 +65,12 @@ public final class HardwareDescription implements Serializable {
 	 * @param sizeOfJvmHeap The size of the JVM heap memory.
 	 * @param sizeOfManagedMemory The size of the memory managed by the system for caching, hashing, sorting, ...
 	 */
-	public HardwareDescription(int numberOfCPUCores, long sizeOfPhysicalMemory, long sizeOfJvmHeap, long sizeOfManagedMemory) {
+	@JsonCreator
+	public HardwareDescription(
+			@JsonProperty(FIELD_NAME_CPU_CORES) int numberOfCPUCores,
+			@JsonProperty(FIELD_NAME_SIZE_PHYSICAL_MEMORY) long sizeOfPhysicalMemory,
+			@JsonProperty(FIELD_NAME_SIZE_JVM_HEAP) long sizeOfJvmHeap,
+			@JsonProperty(FIELD_NAME_SIZE_MANAGED_MEMORY) long sizeOfManagedMemory) {
 		this.numberOfCPUCores = numberOfCPUCores;
 		this.sizeOfPhysicalMemory = sizeOfPhysicalMemory;
 		this.sizeOfJvmHeap = sizeOfJvmHeap;
@@ -96,6 +117,27 @@ public final class HardwareDescription implements Serializable {
 	// Utils
 	// --------------------------------------------------------------------------------------------
 
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		HardwareDescription that = (HardwareDescription) o;
+		return numberOfCPUCores == that.numberOfCPUCores &&
+			sizeOfPhysicalMemory == that.sizeOfPhysicalMemory &&
+			sizeOfJvmHeap == that.sizeOfJvmHeap &&
+			sizeOfManagedMemory == that.sizeOfManagedMemory;
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(numberOfCPUCores, sizeOfPhysicalMemory, sizeOfJvmHeap, sizeOfManagedMemory);
+	}
+
 	@Override
 	public String toString() {
 		return String.format("cores=%d, physMem=%d, heap=%d, managed=%d", 

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index cccaf95..60ccb27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -51,6 +52,7 @@ import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
 import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rest.messages.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -330,6 +332,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			final String taskExecutorAddress,
 			final ResourceID taskExecutorResourceId,
 			final SlotReport slotReport,
+			final int dataPort,
+			final HardwareDescription hardwareDescription,
 			final Time timeout) {
 
 		CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
@@ -343,7 +347,9 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 						taskExecutorGateway,
 						taskExecutorAddress,
 						taskExecutorResourceId,
-						slotReport);
+						slotReport,
+						dataPort,
+						hardwareDescription);
 				}
 			},
 			getMainThreadExecutor());
@@ -486,6 +492,28 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	@Override
+	public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Time timeout) {
+
+		ArrayList<TaskManagerInfo> taskManagerInfos = new ArrayList<>(taskExecutors.size());
+
+		for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> taskExecutorEntry : taskExecutors.entrySet()) {
+			final WorkerRegistration<WorkerType> taskExecutor = taskExecutorEntry.getValue();
+
+			taskManagerInfos.add(
+				new TaskManagerInfo(
+					taskExecutor.getInstanceID(),
+					taskExecutor.getTaskExecutorGateway().getAddress(),
+					taskExecutor.getDataPort(),
+					taskManagerHeartbeatManager.getLastHeartbeatFrom(taskExecutorEntry.getKey()),
+					slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
+					slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
+					taskExecutor.getHardwareDescription()));
+		}
+
+		return CompletableFuture.completedFuture(taskManagerInfos);
+	}
+
+	@Override
 	public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
 		final int numberSlots = slotManager.getNumberRegisteredSlots();
 		final int numberFreeSlots = slotManager.getNumberFreeSlots();
@@ -588,13 +616,17 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 * @param taskExecutorAddress address of the TaskExecutor
 	 * @param taskExecutorResourceId ResourceID of the TaskExecutor
 	 * @param slotReport initial slot report from the TaskExecutor
+	 * @param dataPort port used for data transfer
+	 * @param hardwareDescription of the registering TaskExecutor
 	 * @return RegistrationResponse
 	 */
 	private RegistrationResponse registerTaskExecutorInternal(
-		TaskExecutorGateway taskExecutorGateway,
-		String taskExecutorAddress,
-		ResourceID taskExecutorResourceId,
-		SlotReport slotReport) {
+			TaskExecutorGateway taskExecutorGateway,
+			String taskExecutorAddress,
+			ResourceID taskExecutorResourceId,
+			SlotReport slotReport,
+			int dataPort,
+			HardwareDescription hardwareDescription) {
 		WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);
 		if (oldRegistration != null) {
 			// TODO :: suggest old taskExecutor to stop itself
@@ -612,7 +644,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			return new RegistrationResponse.Decline("unrecognized TaskExecutor");
 		} else {
 			WorkerRegistration<WorkerType> registration =
-				new WorkerRegistration<>(taskExecutorGateway, newWorker);
+				new WorkerRegistration<>(taskExecutorGateway, newWorker, dataPort, hardwareDescription);
 
 			taskExecutors.put(taskExecutorResourceId, registration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index f67368c..a0cf877 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -25,10 +25,12 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.rest.messages.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
@@ -79,6 +81,8 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
 	 * @param taskExecutorAddress The address of the TaskExecutor that registers
 	 * @param resourceId The resource ID of the TaskExecutor that registers
 	 * @param slotReport The slot report containing free and allocated task slots
+	 * @param dataPort port used for data communication between TaskExecutors
+	 * @param hardwareDescription of the registering TaskExecutor
 	 * @param timeout The timeout for the response.
 	 *
 	 * @return The future to the response by the ResourceManager.
@@ -87,6 +91,8 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
 		String taskExecutorAddress,
 		ResourceID resourceId,
 		SlotReport slotReport,
+		int dataPort,
+		HardwareDescription hardwareDescription,
 		@RpcTimeout Time timeout);
 
 	/**
@@ -162,6 +168,14 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
 	void disconnectJobManager(JobID jobId, Exception cause);
 
 	/**
+	 * Requests information about the registered {@link TaskExecutor}.
+	 *
+	 * @param timeout for the rpc.
+	 * @return Future collection of TaskManager information
+	 */
+	CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(@RpcTimeout Time timeout);
+	 
+	/**
 	 * Requests the resource overview. The resource overview provides information about the
 	 * connected TaskManagers, the total number of slots and the number of available slots.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
index 8f949b0..47697b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.resourcemanager.registration;
 
+import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.util.Preconditions;
 
@@ -30,12 +31,30 @@ public class WorkerRegistration<WorkerType extends Serializable> extends TaskExe
 
 	private final WorkerType worker;
 
-	public WorkerRegistration(TaskExecutorGateway taskExecutorGateway, WorkerType worker) {
+	private final int dataPort;
+
+	private final HardwareDescription hardwareDescription;
+
+	public WorkerRegistration(
+			TaskExecutorGateway taskExecutorGateway,
+			WorkerType worker,
+			int dataPort,
+			HardwareDescription hardwareDescription) {
 		super(taskExecutorGateway);
 		this.worker = Preconditions.checkNotNull(worker);
+		this.dataPort = dataPort;
+		this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
 	}
 
 	public WorkerType getWorker() {
 		return worker;
 	}
+
+	public int getDataPort() {
+		return dataPort;
+	}
+
+	public HardwareDescription getHardwareDescription() {
+		return hardwareDescription;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/TaskManagersHandler.java
new file mode 100644
index 0000000..539c672
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/TaskManagersHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.TaskManagersInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Returns an overview over all registered TaskManagers of the cluster.
+ */
+public class TaskManagersHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
+
+	private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
+
+	public TaskManagersHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<T> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> messageHeaders,
+			GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever) {
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+		this.resourceManagerGatewayRetriever = Preconditions.checkNotNull(resourceManagerGatewayRetriever);
+	}
+
+	@Override
+	protected CompletableFuture<TaskManagersInfo> handleRequest(
+			@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
+			@Nonnull T gateway) throws RestHandlerException {
+		Optional<ResourceManagerGateway> resourceManagerGatewayOptional = resourceManagerGatewayRetriever.getNow();
+
+		ResourceManagerGateway resourceManagerGateway = resourceManagerGatewayOptional.orElseThrow(
+			() -> new RestHandlerException("Cannot connect to ResourceManager right now. Please try to refresh.", HttpResponseStatus.NOT_FOUND));
+
+		return resourceManagerGateway
+			.requestTaskManagerInfo(timeout)
+			.thenApply(TaskManagersInfo::new);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerInfo.java
new file mode 100644
index 0000000..979033b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerInfo.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rest.messages.json.InstanceIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.InstanceIDSerializer;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.Objects;
+
+/**
+ * Base class containing information for a {@link TaskExecutor}.
+ */
+public class TaskManagerInfo implements ResponseBody {
+
+	public static final String FIELD_NAME_INSTANCE_ID = "id";
+
+	public static final String FIELD_NAME_ADDRESS = "path";
+
+	public static final String FIELD_NAME_DATA_PORT = "dataPort";
+
+	public static final String FIELD_NAME_LAST_HEARTBEAT = "timeSinceLastHeartbeat";
+
+	public static final String FIELD_NAME_NUMBER_SLOTS = "slotsNumber";
+
+	public static final String FIELD_NAME_NUMBER_AVAILABLE_SLOTS = "freeSlots";
+
+	public static final String FIELD_NAME_HARDWARE = "hardware";
+
+	@JsonProperty(FIELD_NAME_INSTANCE_ID)
+	@JsonSerialize(using = InstanceIDSerializer.class)
+	private final InstanceID instanceId;
+
+	@JsonProperty(FIELD_NAME_ADDRESS)
+	private final String address;
+
+	@JsonProperty(FIELD_NAME_DATA_PORT)
+	private final int dataPort;
+
+	@JsonProperty(FIELD_NAME_LAST_HEARTBEAT)
+	private final long lastHeartbeat;
+
+	@JsonProperty(FIELD_NAME_NUMBER_SLOTS)
+	private final int numberSlots;
+
+	@JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS)
+	private final int numberAvailableSlots;
+
+	@JsonProperty(FIELD_NAME_HARDWARE)
+	private final HardwareDescription hardwareDescription;
+
+	@JsonCreator
+	public TaskManagerInfo(
+			@JsonDeserialize(using = InstanceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId,
+			@JsonProperty(FIELD_NAME_ADDRESS) String address,
+			@JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
+			@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
+			@JsonProperty(FIELD_NAME_NUMBER_SLOTS) int numberSlots,
+			@JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS) int numberAvailableSlots,
+			@JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription) {
+		this.instanceId = Preconditions.checkNotNull(instanceId);
+		this.address = Preconditions.checkNotNull(address);
+		this.dataPort = dataPort;
+		this.lastHeartbeat = lastHeartbeat;
+		this.numberSlots = numberSlots;
+		this.numberAvailableSlots = numberAvailableSlots;
+		this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		TaskManagerInfo that = (TaskManagerInfo) o;
+		return dataPort == that.dataPort &&
+			lastHeartbeat == that.lastHeartbeat &&
+			numberSlots == that.numberSlots &&
+			numberAvailableSlots == that.numberAvailableSlots &&
+			Objects.equals(instanceId, that.instanceId) &&
+			Objects.equals(address, that.address) &&
+			Objects.equals(hardwareDescription, that.hardwareDescription);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(
+			instanceId,
+			address,
+			dataPort,
+			lastHeartbeat,
+			numberSlots,
+			numberAvailableSlots,
+			hardwareDescription);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersHeaders.java
new file mode 100644
index 0000000..bc0f82e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link TaskManagersHandler}.
+ */
+public class TaskManagersHeaders implements MessageHeaders<EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
+
+	private static final TaskManagersHeaders INSTANCE = new TaskManagersHeaders();
+
+	public static final String URL = "/taskmanagers";
+
+	private TaskManagersHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<TaskManagersInfo> getResponseClass() {
+		return TaskManagersInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public EmptyMessageParameters getUnresolvedMessageParameters() {
+		return EmptyMessageParameters.getInstance();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static TaskManagersHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersInfo.java
new file mode 100644
index 0000000..2149912
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Class containing a collection of {@link TaskManagerInfo}.
+ */
+public class TaskManagersInfo implements ResponseBody {
+
+	public static final String FIELD_NAME_TASK_MANAGERS = "taskmanagers";
+
+	@JsonProperty(FIELD_NAME_TASK_MANAGERS)
+	private final Collection<TaskManagerInfo> taskManagerInfos;
+
+	@JsonCreator
+	public TaskManagersInfo(
+			@JsonProperty(FIELD_NAME_TASK_MANAGERS) Collection<TaskManagerInfo> taskManagerInfos) {
+		this.taskManagerInfos = Preconditions.checkNotNull(taskManagerInfos);
+	}
+
+	public Collection<TaskManagerInfo> getTaskManagerInfos() {
+		return taskManagerInfos;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		TaskManagersInfo that = (TaskManagersInfo) o;
+		return Objects.equals(taskManagerInfos, that.taskManagerInfos);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(taskManagerInfos);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDDeserializer.java
new file mode 100644
index 0000000..1c53dcd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDDeserializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+/**
+ * Json deserializer for {@link InstanceID}.
+ */
+public class InstanceIDDeserializer extends StdDeserializer<InstanceID> {
+
+	private static final long serialVersionUID = -9058463293913469849L;
+
+	protected InstanceIDDeserializer() {
+		super(InstanceID.class);
+	}
+
+	@Override
+	public InstanceID deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+		return new InstanceID(StringUtils.hexStringToByte(p.getValueAsString()));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDSerializer.java
new file mode 100644
index 0000000..f3c0dc5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDSerializer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.instance.InstanceID;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * Json serializer for {@link InstanceID}.
+ */
+public class InstanceIDSerializer extends StdSerializer<InstanceID> {
+
+	private static final long serialVersionUID = 5798852092159615938L;
+
+	protected InstanceIDSerializer() {
+		super(InstanceID.class);
+	}
+
+	@Override
+	public void serialize(InstanceID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+		gen.writeString(value.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index c48d188..a348948 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
@@ -166,6 +167,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 	// ------------------------------------------------------------------------
 
+	private final HardwareDescription hardwareDescription;
+
 	public TaskExecutor(
 			RpcService rpcService,
 			TaskManagerConfiguration taskManagerConfiguration,
@@ -214,6 +217,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			new ResourceManagerHeartbeatListener(),
 			rpcService.getScheduledExecutor(),
 			log);
+
+		hardwareDescription = HardwareDescription.extractFromSystem(memoryManager.getMemorySize());
 	}
 
 	// ------------------------------------------------------------------------
@@ -720,6 +725,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 					getAddress(),
 					getResourceID(),
 					taskSlotTable.createSlotReport(getResourceID()),
+					taskManagerLocation.dataPort(),
+					hardwareDescription,
 					newLeaderAddress,
 					newResourceManagerId,
 					getMainThreadExecutor(),

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index c3d3532..1288009 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationConnectionListener;
@@ -51,6 +52,10 @@ public class TaskExecutorToResourceManagerConnection
 
 	private final SlotReport slotReport;
 
+	private final int dataPort;
+
+	private final HardwareDescription hardwareDescription;
+
 	private final RegistrationConnectionListener<TaskExecutorRegistrationSuccess> registrationListener;
 
 	private InstanceID registrationId;
@@ -63,6 +68,8 @@ public class TaskExecutorToResourceManagerConnection
 			String taskManagerAddress,
 			ResourceID taskManagerResourceId,
 			SlotReport slotReport,
+			int dataPort,
+			HardwareDescription hardwareDescription,
 			String resourceManagerAddress,
 			ResourceManagerId resourceManagerId,
 			Executor executor,
@@ -74,6 +81,8 @@ public class TaskExecutorToResourceManagerConnection
 		this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress);
 		this.taskManagerResourceId = Preconditions.checkNotNull(taskManagerResourceId);
 		this.slotReport = Preconditions.checkNotNull(slotReport);
+		this.dataPort = dataPort;
+		this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
 		this.registrationListener = Preconditions.checkNotNull(registrationListener);
 	}
 
@@ -87,7 +96,9 @@ public class TaskExecutorToResourceManagerConnection
 			getTargetLeaderId(),
 			taskManagerAddress,
 			taskManagerResourceId,
-			slotReport);
+			slotReport,
+			dataPort,
+			hardwareDescription);
 	}
 
 	@Override
@@ -135,6 +146,10 @@ public class TaskExecutorToResourceManagerConnection
 
 		private final SlotReport slotReport;
 
+		private final int dataPort;
+
+		private final HardwareDescription hardwareDescription;
+
 		ResourceManagerRegistration(
 				Logger log,
 				RpcService rpcService,
@@ -142,12 +157,16 @@ public class TaskExecutorToResourceManagerConnection
 				ResourceManagerId resourceManagerId,
 				String taskExecutorAddress,
 				ResourceID resourceID,
-				SlotReport slotReport) {
+				SlotReport slotReport,
+				int dataPort,
+				HardwareDescription hardwareDescription) {
 
 			super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, resourceManagerId);
 			this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
 			this.resourceID = checkNotNull(resourceID);
 			this.slotReport = checkNotNull(slotReport);
+			this.dataPort = dataPort;
+			this.hardwareDescription = checkNotNull(hardwareDescription);
 		}
 
 		@Override
@@ -155,7 +174,13 @@ public class TaskExecutorToResourceManagerConnection
 				ResourceManagerGateway resourceManager, ResourceManagerId fencingToken, long timeoutMillis) throws Exception {
 
 			Time timeout = Time.milliseconds(timeoutMillis);
-			return resourceManager.registerTaskExecutor(taskExecutorAddress, resourceID, slotReport, timeout);
+			return resourceManager.registerTaskExecutor(
+				taskExecutorAddress,
+				resourceID,
+				slotReport,
+				dataPort,
+				hardwareDescription,
+				timeout);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index d8e65a6..2b58b6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
@@ -481,6 +482,8 @@ public class ResourceManagerTest extends TestLogger {
 
 	@Test
 	public void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
+		final int dataPort = 1234;
+		final HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
 		final String taskManagerAddress = "tm";
 		final ResourceID taskManagerResourceID = new ResourceID(taskManagerAddress);
 		final ResourceID resourceManagerResourceID = ResourceID.generate();
@@ -537,6 +540,8 @@ public class ResourceManagerTest extends TestLogger {
 				taskManagerAddress,
 				taskManagerResourceID,
 				slotReport,
+				dataPort,
+				hardwareDescription,
 				timeout);
 			RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 147d180..524c503 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
@@ -60,6 +61,10 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
 	private SlotReport slotReport = new SlotReport();
 
+	private int dataPort = 1234;
+
+	private HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
+
 	private static String taskExecutorAddress = "/taskExecutor1";
 
 	private ResourceID taskExecutorResourceID;
@@ -103,13 +108,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		try {
 			// test response successful
 			CompletableFuture<RegistrationResponse> successfulFuture =
-				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, timeout);
+				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, dataPort, hardwareDescription, timeout);
 			RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
 
 			// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
 			CompletableFuture<RegistrationResponse> duplicateFuture =
-				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, timeout);
+				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, dataPort, hardwareDescription, timeout);
 			RegistrationResponse duplicateResponse = duplicateFuture.get();
 			assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
 			assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
@@ -128,7 +133,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		try {
 			// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
 			CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
-				wronglyFencedGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, timeout);
+				wronglyFencedGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, dataPort, hardwareDescription, timeout);
 
 			try {
 				unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
@@ -152,7 +157,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 			// test throw exception when receive a registration from taskExecutor which takes invalid address
 			String invalidAddress = "/taskExecutor2";
 			CompletableFuture<RegistrationResponse> invalidAddressFuture =
-				rmGateway.registerTaskExecutor(invalidAddress, taskExecutorResourceID, slotReport, timeout);
+				rmGateway.registerTaskExecutor(invalidAddress, taskExecutorResourceID, slotReport, dataPort, hardwareDescription, timeout);
 			assertTrue(invalidAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline);
 		} finally {
 			if (testingFatalErrorHandler.hasExceptionOccurred()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagerInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagerInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagerInfoTest.java
new file mode 100644
index 0000000..371e818
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagerInfoTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Test for (un)marshalling of the {@link TaskManagerInfo}.
+ */
+public class TaskManagerInfoTest extends RestResponseMarshallingTestBase<TaskManagerInfo> {
+
+	private static final Random random = new Random();
+
+	@Override
+	protected Class<TaskManagerInfo> getTestResponseClass() {
+		return TaskManagerInfo.class;
+	}
+
+	@Override
+	protected TaskManagerInfo getTestResponseInstance() throws Exception {
+		return createRandomTaskManagerInfo();
+	}
+
+	static TaskManagerInfo createRandomTaskManagerInfo() {
+		return new TaskManagerInfo(
+			new InstanceID(),
+			UUID.randomUUID().toString(),
+			random.nextInt(),
+			random.nextLong(),
+			random.nextInt(),
+			random.nextInt(),
+			new HardwareDescription(
+				random.nextInt(),
+				random.nextLong(),
+				random.nextLong(),
+				random.nextLong()));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagersInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagersInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagersInfoTest.java
new file mode 100644
index 0000000..1f53674
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagersInfoTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Arrays;
+
+import static org.apache.flink.runtime.rest.messages.TaskManagerInfoTest.createRandomTaskManagerInfo;
+
+/**
+ * Test for (un)marshalling of {@link TaskManagersInfo}.
+ */
+public class TaskManagersInfoTest extends RestResponseMarshallingTestBase<TaskManagersInfo> {
+
+	@Override
+	protected Class<TaskManagersInfo> getTestResponseClass() {
+		return TaskManagersInfo.class;
+	}
+
+	@Override
+	protected TaskManagersInfo getTestResponseInstance() throws Exception {
+		final TaskManagerInfo taskManagerInfo1 = createRandomTaskManagerInfo();
+		final TaskManagerInfo taskManagerInfo2 = createRandomTaskManagerInfo();
+
+		return new TaskManagersInfo(Arrays.asList(taskManagerInfo1, taskManagerInfo2));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index de807a6..776bdf9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -260,7 +261,7 @@ public class TaskExecutorTest extends TestLogger {
 		// register the mock resource manager gateway
 		ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
 		when(rmGateway.registerTaskExecutor(
-			anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+			anyString(), any(ResourceID.class), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
 			.thenReturn(
 				CompletableFuture.completedFuture(
 					new TaskExecutorRegistrationSuccess(
@@ -334,7 +335,7 @@ public class TaskExecutorTest extends TestLogger {
 
 			// register resource manager success will trigger monitoring heartbeat target between tm and rm
 			verify(rmGateway, Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).registerTaskExecutor(
-					eq(taskManager.getAddress()), eq(tmResourceId), any(SlotReport.class), any(Time.class));
+					eq(taskManager.getAddress()), eq(tmResourceId), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class));
 
 			// heartbeat timeout should trigger disconnect TaskManager from ResourceManager
 			verify(rmGateway, timeout(heartbeatTimeout * 50L)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class));
@@ -363,7 +364,7 @@ public class TaskExecutorTest extends TestLogger {
 		// register the mock resource manager gateway
 		ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
 		when(rmGateway.registerTaskExecutor(
-			anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+			anyString(), any(ResourceID.class), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
 			.thenReturn(
 				CompletableFuture.completedFuture(
 					new TaskExecutorRegistrationSuccess(
@@ -454,7 +455,7 @@ public class TaskExecutorTest extends TestLogger {
 
 			// register resource manager success will trigger monitoring heartbeat target between tm and rm
 			verify(rmGateway, timeout(verificationTimeout).atLeast(1)).registerTaskExecutor(
-				eq(taskManager.getAddress()), eq(tmResourceId), eq(slotReport1), any(Time.class));
+				eq(taskManager.getAddress()), eq(tmResourceId), eq(slotReport1), anyInt(), any(HardwareDescription.class), any(Time.class));
 
 			verify(heartbeatManager, timeout(verificationTimeout)).monitorTarget(any(ResourceID.class), any(HeartbeatTarget.class));
 
@@ -495,7 +496,7 @@ public class TaskExecutorTest extends TestLogger {
 		// register a mock resource manager gateway
 		ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
 		when(rmGateway.registerTaskExecutor(
-					anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+					anyString(), any(ResourceID.class), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
 			.thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
 				new InstanceID(), resourceManagerResourceId, 10L)));
 
@@ -540,7 +541,7 @@ public class TaskExecutorTest extends TestLogger {
 			String taskManagerAddress = taskManager.getAddress();
 
 			verify(rmGateway, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
-					eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
+					eq(taskManagerAddress), eq(resourceID), eq(slotReport), anyInt(), any(HardwareDescription.class), any(Time.class));
 
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
@@ -567,11 +568,11 @@ public class TaskExecutorTest extends TestLogger {
 		ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);
 
 		when(rmGateway1.registerTaskExecutor(
-					anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+					anyString(), any(ResourceID.class), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
 			.thenReturn(CompletableFuture.completedFuture(
 				new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, 10L)));
 		when(rmGateway2.registerTaskExecutor(
-					anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+					anyString(), any(ResourceID.class), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
 			.thenReturn(CompletableFuture.completedFuture(
 				new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, 10L)));
 
@@ -628,7 +629,7 @@ public class TaskExecutorTest extends TestLogger {
 			testLeaderService.notifyListener(address1, leaderId1);
 
 			verify(rmGateway1, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
-					eq(taskManagerAddress), eq(tmResourceID), any(SlotReport.class), any(Time.class));
+					eq(taskManagerAddress), eq(tmResourceID), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
 
 			// cancel the leader 
@@ -638,7 +639,7 @@ public class TaskExecutorTest extends TestLogger {
 			testLeaderService.notifyListener(address2, leaderId2);
 
 			verify(rmGateway2, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
-					eq(taskManagerAddress), eq(tmResourceID), eq(slotReport), any(Time.class));
+					eq(taskManagerAddress), eq(tmResourceID), eq(slotReport), anyInt(), any(HardwareDescription.class), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
 
 			// check if a concurrent error occurred
@@ -829,6 +830,8 @@ public class TaskExecutorTest extends TestLogger {
 			any(String.class),
 			eq(resourceId),
 			any(SlotReport.class),
+			anyInt(),
+			any(HardwareDescription.class),
 			any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
 
 		final String jobManagerAddress = "jm";
@@ -947,6 +950,8 @@ public class TaskExecutorTest extends TestLogger {
 			any(String.class),
 			eq(resourceId),
 			any(SlotReport.class),
+			anyInt(),
+			any(HardwareDescription.class),
 			any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
 
 		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
@@ -998,6 +1003,8 @@ public class TaskExecutorTest extends TestLogger {
 				eq(taskManager.getAddress()),
 				eq(resourceId),
 				any(SlotReport.class),
+				anyInt(),
+				any(HardwareDescription.class),
 				any(Time.class));
 
 			taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds(10000L));
@@ -1062,7 +1069,7 @@ public class TaskExecutorTest extends TestLogger {
 		when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(new SlotReport());
 		when(taskSlotTable.getCurrentAllocation(1)).thenReturn(new AllocationID());
 
-			when(rmGateway1.registerTaskExecutor(anyString(), eq(resourceID), any(SlotReport.class), any(Time.class))).thenReturn(
+			when(rmGateway1.registerTaskExecutor(anyString(), eq(resourceID), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class))).thenReturn(
 			CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), 1000L)));
 
 		TaskExecutor taskManager = new TaskExecutor(
@@ -1096,7 +1103,7 @@ public class TaskExecutorTest extends TestLogger {
 			testLeaderService.notifyListener(address1, resourceManagerId.toUUID());
 
 			verify(rmGateway1, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
-				eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
+				eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
 
 			// test that allocating a slot works
@@ -1125,7 +1132,7 @@ public class TaskExecutorTest extends TestLogger {
 
 			// re-register
 			verify(rmGateway1, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
-				eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
+				eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class));
 			testLeaderService.notifyListener(address1, resourceManagerId.toUUID());
 
 			// now we should be successful because the slots status has been synced
@@ -1185,6 +1192,8 @@ public class TaskExecutorTest extends TestLogger {
 			any(String.class),
 			eq(resourceId),
 			any(SlotReport.class),
+			anyInt(),
+			any(HardwareDescription.class),
 			any(Time.class))).thenReturn(
 				CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));