You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:04 UTC
[06/30] flink git commit: [FLINK-7648] [flip6] Add TaskManagersHandler
[FLINK-7648] [flip6] Add TaskManagersHandler
Send dataPort and HardwareDescription to RM
Instantiate RM leader retriever
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/def87816
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/def87816
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/def87816
Branch: refs/heads/master
Commit: def87816f376740902f0944a6aa5791a0a937e89
Parents: eddb5b0
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Oct 12 00:40:17 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:42 2017 +0100
----------------------------------------------------------------------
.../MesosResourceManagerTest.java | 5 +-
.../dispatcher/DispatcherRestEndpoint.java | 19 ++-
.../entrypoint/SessionClusterEntrypoint.java | 24 ++++
.../runtime/instance/HardwareDescription.java | 44 ++++++-
.../resourcemanager/ResourceManager.java | 44 ++++++-
.../resourcemanager/ResourceManagerGateway.java | 14 +++
.../registration/WorkerRegistration.java | 21 +++-
.../rest/handler/TaskManagersHandler.java | 71 +++++++++++
.../runtime/rest/messages/TaskManagerInfo.java | 123 +++++++++++++++++++
.../rest/messages/TaskManagersHeaders.java | 70 +++++++++++
.../runtime/rest/messages/TaskManagersInfo.java | 65 ++++++++++
.../messages/json/InstanceIDDeserializer.java | 45 +++++++
.../messages/json/InstanceIDSerializer.java | 44 +++++++
.../runtime/taskexecutor/TaskExecutor.java | 7 ++
...TaskExecutorToResourceManagerConnection.java | 31 ++++-
.../clusterframework/ResourceManagerTest.java | 5 +
.../ResourceManagerTaskExecutorTest.java | 13 +-
.../rest/messages/TaskManagerInfoTest.java | 58 +++++++++
.../rest/messages/TaskManagersInfoTest.java | 42 +++++++
.../runtime/taskexecutor/TaskExecutorTest.java | 35 ++++--
20 files changed, 749 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index a45abe0..a0a8069 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
@@ -644,9 +645,11 @@ public class MesosResourceManagerTest extends TestLogger {
startResourceManager();
assertThat(resourceManager.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
+ final int dataPort = 1234;
+ final HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
// send registration message
CompletableFuture<RegistrationResponse> successfulFuture =
- resourceManager.registerTaskExecutor(task1Executor.address, task1Executor.resourceID, slotReport, timeout);
+ resourceManager.registerTaskExecutor(task1Executor.address, task1Executor.resourceID, slotReport, dataPort, hardwareDescription, timeout);
RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
assertTrue(response instanceof TaskExecutorRegistrationSuccess);
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 7f9c148..829f058 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -22,11 +22,13 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.TaskManagersHandler;
import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
@@ -59,6 +61,7 @@ import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.TaskManagersHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
@@ -87,6 +90,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
private final GatewayRetriever<DispatcherGateway> leaderRetriever;
private final Configuration clusterConfiguration;
private final RestHandlerConfiguration restConfiguration;
+ private final GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever;
private final Executor executor;
private final ExecutionGraphCache executionGraphCache;
@@ -97,11 +101,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
GatewayRetriever<DispatcherGateway> leaderRetriever,
Configuration clusterConfiguration,
RestHandlerConfiguration restConfiguration,
+ GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
Executor executor) {
super(endpointConfiguration);
this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
this.clusterConfiguration = Preconditions.checkNotNull(clusterConfiguration);
this.restConfiguration = Preconditions.checkNotNull(restConfiguration);
+ this.resourceManagerRetriever = Preconditions.checkNotNull(resourceManagerRetriever);
this.executor = Preconditions.checkNotNull(executor);
this.executionGraphCache = new ExecutionGraphCache(
@@ -223,7 +229,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
executionGraphCache,
executor,
checkpointStatsCache);
-
+
JobExceptionsHandler jobExceptionsHandler = new JobExceptionsHandler(
restAddressFuture,
leaderRetriever,
@@ -232,7 +238,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
JobExceptionsHeaders.getInstance(),
executionGraphCache,
executor);
-
+
JobVertexAccumulatorsHandler jobVertexAccumulatorsHandler = new JobVertexAccumulatorsHandler(
restAddressFuture,
leaderRetriever,
@@ -254,6 +260,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
timeout,
responseHeaders);
+ TaskManagersHandler<DispatcherGateway> taskManagersHandler = new TaskManagersHandler<>(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ TaskManagersHeaders.getInstance(),
+ resourceManagerRetriever);
+
final File tmpDir = restConfiguration.getTmpDir();
Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -284,6 +298,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
+ handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
optWebContent.ifPresent(
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index e24e01a..8a3cfc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -56,6 +57,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
private LeaderRetrievalService dispatcherLeaderRetrievalService;
+ private LeaderRetrievalService resourceManagerRetrievalService;
+
private DispatcherRestEndpoint dispatcherRestEndpoint;
public SessionClusterEntrypoint(Configuration configuration) {
@@ -73,6 +76,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
+ resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
+
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
@@ -80,9 +85,17 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
10,
Time.milliseconds(50L));
+ LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
+ rpcService,
+ ResourceManagerGateway.class,
+ ResourceManagerId::new,
+ 10,
+ Time.milliseconds(50L));
+
dispatcherRestEndpoint = createDispatcherRestEndpoint(
configuration,
dispatcherGatewayRetriever,
+ resourceManagerGatewayRetriever,
rpcService.getExecutor());
LOG.debug("Starting Dispatcher REST endpoint.");
@@ -110,6 +123,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
LOG.debug("Starting ResourceManager.");
resourceManager.start();
+ resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
LOG.debug("Starting Dispatcher.");
dispatcher.start();
@@ -140,6 +154,14 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
}
}
+ if (resourceManagerRetrievalService != null) {
+ try {
+ resourceManagerRetrievalService.stop();
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
if (resourceManager != null) {
try {
resourceManager.shutDown();
@@ -156,6 +178,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
protected DispatcherRestEndpoint createDispatcherRestEndpoint(
Configuration configuration,
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
+ LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
Executor executor) throws Exception {
return new DispatcherRestEndpoint(
@@ -163,6 +186,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
dispatcherGatewayRetriever,
configuration,
RestHandlerConfiguration.fromConfiguration(configuration),
+ resourceManagerGatewayRetriever,
executor);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
index 9c1c5b7..30f7480 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
@@ -20,7 +20,11 @@ package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.util.Hardware;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import java.io.Serializable;
+import java.util.Objects;
/**
* A hardware description describes the resources available to a task manager.
@@ -29,16 +33,28 @@ public final class HardwareDescription implements Serializable {
private static final long serialVersionUID = 3380016608300325361L;
+ public static final String FIELD_NAME_CPU_CORES = "cpuCores";
+
+ public static final String FIELD_NAME_SIZE_PHYSICAL_MEMORY = "physicalMemory";
+
+ public static final String FIELD_NAME_SIZE_JVM_HEAP = "freeMemory";
+
+ public static final String FIELD_NAME_SIZE_MANAGED_MEMORY = "managedMemory";
+
/** The number of CPU cores available to the JVM on the compute node. */
+ @JsonProperty(FIELD_NAME_CPU_CORES)
private final int numberOfCPUCores;
/** The size of physical memory in bytes available on the compute node. */
+ @JsonProperty(FIELD_NAME_SIZE_PHYSICAL_MEMORY)
private final long sizeOfPhysicalMemory;
/** The size of the JVM heap memory */
+ @JsonProperty(FIELD_NAME_SIZE_JVM_HEAP)
private final long sizeOfJvmHeap;
/** The size of the memory managed by the system for caching, hashing, sorting, ... */
+ @JsonProperty(FIELD_NAME_SIZE_MANAGED_MEMORY)
private final long sizeOfManagedMemory;
/**
@@ -49,7 +65,12 @@ public final class HardwareDescription implements Serializable {
* @param sizeOfJvmHeap The size of the JVM heap memory.
* @param sizeOfManagedMemory The size of the memory managed by the system for caching, hashing, sorting, ...
*/
- public HardwareDescription(int numberOfCPUCores, long sizeOfPhysicalMemory, long sizeOfJvmHeap, long sizeOfManagedMemory) {
+ @JsonCreator
+ public HardwareDescription(
+ @JsonProperty(FIELD_NAME_CPU_CORES) int numberOfCPUCores,
+ @JsonProperty(FIELD_NAME_SIZE_PHYSICAL_MEMORY) long sizeOfPhysicalMemory,
+ @JsonProperty(FIELD_NAME_SIZE_JVM_HEAP) long sizeOfJvmHeap,
+ @JsonProperty(FIELD_NAME_SIZE_MANAGED_MEMORY) long sizeOfManagedMemory) {
this.numberOfCPUCores = numberOfCPUCores;
this.sizeOfPhysicalMemory = sizeOfPhysicalMemory;
this.sizeOfJvmHeap = sizeOfJvmHeap;
@@ -96,6 +117,27 @@ public final class HardwareDescription implements Serializable {
// Utils
// --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HardwareDescription that = (HardwareDescription) o;
+ return numberOfCPUCores == that.numberOfCPUCores &&
+ sizeOfPhysicalMemory == that.sizeOfPhysicalMemory &&
+ sizeOfJvmHeap == that.sizeOfJvmHeap &&
+ sizeOfManagedMemory == that.sizeOfManagedMemory;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(numberOfCPUCores, sizeOfPhysicalMemory, sizeOfJvmHeap, sizeOfManagedMemory);
+ }
+
@Override
public String toString() {
return String.format("cores=%d, physMem=%d, heap=%d, managed=%d",
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index cccaf95..60ccb27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -51,6 +52,7 @@ import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rest.messages.TaskManagerInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
@@ -330,6 +332,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
final String taskExecutorAddress,
final ResourceID taskExecutorResourceId,
final SlotReport slotReport,
+ final int dataPort,
+ final HardwareDescription hardwareDescription,
final Time timeout) {
CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
@@ -343,7 +347,9 @@ public abstract class ResourceManager<WorkerType extends Serializable>
taskExecutorGateway,
taskExecutorAddress,
taskExecutorResourceId,
- slotReport);
+ slotReport,
+ dataPort,
+ hardwareDescription);
}
},
getMainThreadExecutor());
@@ -486,6 +492,28 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
@Override
+ public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Time timeout) {
+
+ ArrayList<TaskManagerInfo> taskManagerInfos = new ArrayList<>(taskExecutors.size());
+
+ for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> taskExecutorEntry : taskExecutors.entrySet()) {
+ final WorkerRegistration<WorkerType> taskExecutor = taskExecutorEntry.getValue();
+
+ taskManagerInfos.add(
+ new TaskManagerInfo(
+ taskExecutor.getInstanceID(),
+ taskExecutor.getTaskExecutorGateway().getAddress(),
+ taskExecutor.getDataPort(),
+ taskManagerHeartbeatManager.getLastHeartbeatFrom(taskExecutorEntry.getKey()),
+ slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
+ slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
+ taskExecutor.getHardwareDescription()));
+ }
+
+ return CompletableFuture.completedFuture(taskManagerInfos);
+ }
+
+ @Override
public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
final int numberSlots = slotManager.getNumberRegisteredSlots();
final int numberFreeSlots = slotManager.getNumberFreeSlots();
@@ -588,13 +616,17 @@ public abstract class ResourceManager<WorkerType extends Serializable>
* @param taskExecutorAddress address of the TaskExecutor
* @param taskExecutorResourceId ResourceID of the TaskExecutor
* @param slotReport initial slot report from the TaskExecutor
+ * @param dataPort port used for data transfer
+ * @param hardwareDescription of the registering TaskExecutor
* @return RegistrationResponse
*/
private RegistrationResponse registerTaskExecutorInternal(
- TaskExecutorGateway taskExecutorGateway,
- String taskExecutorAddress,
- ResourceID taskExecutorResourceId,
- SlotReport slotReport) {
+ TaskExecutorGateway taskExecutorGateway,
+ String taskExecutorAddress,
+ ResourceID taskExecutorResourceId,
+ SlotReport slotReport,
+ int dataPort,
+ HardwareDescription hardwareDescription) {
WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);
if (oldRegistration != null) {
// TODO :: suggest old taskExecutor to stop itself
@@ -612,7 +644,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
return new RegistrationResponse.Decline("unrecognized TaskExecutor");
} else {
WorkerRegistration<WorkerType> registration =
- new WorkerRegistration<>(taskExecutorGateway, newWorker);
+ new WorkerRegistration<>(taskExecutorGateway, newWorker, dataPort, hardwareDescription);
taskExecutors.put(taskExecutorResourceId, registration);
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index f67368c..a0cf877 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -25,10 +25,12 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.rest.messages.TaskManagerInfo;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.jobmaster.JobMaster;
@@ -79,6 +81,8 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
* @param taskExecutorAddress The address of the TaskExecutor that registers
* @param resourceId The resource ID of the TaskExecutor that registers
* @param slotReport The slot report containing free and allocated task slots
+ * @param dataPort port used for data communication between TaskExecutors
+ * @param hardwareDescription of the registering TaskExecutor
* @param timeout The timeout for the response.
*
* @return The future to the response by the ResourceManager.
@@ -87,6 +91,8 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
String taskExecutorAddress,
ResourceID resourceId,
SlotReport slotReport,
+ int dataPort,
+ HardwareDescription hardwareDescription,
@RpcTimeout Time timeout);
/**
@@ -162,6 +168,14 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
void disconnectJobManager(JobID jobId, Exception cause);
/**
+ * Requests information about the registered {@link TaskExecutor}.
+ *
+ * @param timeout for the rpc.
+ * @return Future collection of TaskManager information
+ */
+ CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(@RpcTimeout Time timeout);
+
+ /**
* Requests the resource overview. The resource overview provides information about the
* connected TaskManagers, the total number of slots and the number of available slots.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
index 8f949b0..47697b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.resourcemanager.registration;
+import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.util.Preconditions;
@@ -30,12 +31,30 @@ public class WorkerRegistration<WorkerType extends Serializable> extends TaskExe
private final WorkerType worker;
- public WorkerRegistration(TaskExecutorGateway taskExecutorGateway, WorkerType worker) {
+ private final int dataPort;
+
+ private final HardwareDescription hardwareDescription;
+
+ public WorkerRegistration(
+ TaskExecutorGateway taskExecutorGateway,
+ WorkerType worker,
+ int dataPort,
+ HardwareDescription hardwareDescription) {
super(taskExecutorGateway);
this.worker = Preconditions.checkNotNull(worker);
+ this.dataPort = dataPort;
+ this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
}
public WorkerType getWorker() {
return worker;
}
+
+ public int getDataPort() {
+ return dataPort;
+ }
+
+ public HardwareDescription getHardwareDescription() {
+ return hardwareDescription;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/TaskManagersHandler.java
new file mode 100644
index 0000000..539c672
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/TaskManagersHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.TaskManagersInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Returns an overview over all registered TaskManagers of the cluster.
+ */
+public class TaskManagersHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
+
+ private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
+
+ public TaskManagersHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<T> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> messageHeaders,
+ GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+ this.resourceManagerGatewayRetriever = Preconditions.checkNotNull(resourceManagerGatewayRetriever);
+ }
+
+ @Override
+ protected CompletableFuture<TaskManagersInfo> handleRequest(
+ @Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
+ @Nonnull T gateway) throws RestHandlerException {
+ Optional<ResourceManagerGateway> resourceManagerGatewayOptional = resourceManagerGatewayRetriever.getNow();
+
+ ResourceManagerGateway resourceManagerGateway = resourceManagerGatewayOptional.orElseThrow(
+ () -> new RestHandlerException("Cannot connect to ResourceManager right now. Please try to refresh.", HttpResponseStatus.NOT_FOUND));
+
+ return resourceManagerGateway
+ .requestTaskManagerInfo(timeout)
+ .thenApply(TaskManagersInfo::new);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerInfo.java
new file mode 100644
index 0000000..979033b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerInfo.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rest.messages.json.InstanceIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.InstanceIDSerializer;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.Objects;
+
+/**
+ * Base class containing information for a {@link TaskExecutor}.
+ */
+public class TaskManagerInfo implements ResponseBody {
+
+ public static final String FIELD_NAME_INSTANCE_ID = "id";
+
+ public static final String FIELD_NAME_ADDRESS = "path";
+
+ public static final String FIELD_NAME_DATA_PORT = "dataPort";
+
+ public static final String FIELD_NAME_LAST_HEARTBEAT = "timeSinceLastHeartbeat";
+
+ public static final String FIELD_NAME_NUMBER_SLOTS = "slotsNumber";
+
+ public static final String FIELD_NAME_NUMBER_AVAILABLE_SLOTS = "freeSlots";
+
+ public static final String FIELD_NAME_HARDWARE = "hardware";
+
+ @JsonProperty(FIELD_NAME_INSTANCE_ID)
+ @JsonSerialize(using = InstanceIDSerializer.class)
+ private final InstanceID instanceId;
+
+ @JsonProperty(FIELD_NAME_ADDRESS)
+ private final String address;
+
+ @JsonProperty(FIELD_NAME_DATA_PORT)
+ private final int dataPort;
+
+ @JsonProperty(FIELD_NAME_LAST_HEARTBEAT)
+ private final long lastHeartbeat;
+
+ @JsonProperty(FIELD_NAME_NUMBER_SLOTS)
+ private final int numberSlots;
+
+ @JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS)
+ private final int numberAvailableSlots;
+
+ @JsonProperty(FIELD_NAME_HARDWARE)
+ private final HardwareDescription hardwareDescription;
+
+ @JsonCreator
+ public TaskManagerInfo(
+ @JsonDeserialize(using = InstanceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) InstanceID instanceId,
+ @JsonProperty(FIELD_NAME_ADDRESS) String address,
+ @JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
+ @JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
+ @JsonProperty(FIELD_NAME_NUMBER_SLOTS) int numberSlots,
+ @JsonProperty(FIELD_NAME_NUMBER_AVAILABLE_SLOTS) int numberAvailableSlots,
+ @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription) {
+ this.instanceId = Preconditions.checkNotNull(instanceId);
+ this.address = Preconditions.checkNotNull(address);
+ this.dataPort = dataPort;
+ this.lastHeartbeat = lastHeartbeat;
+ this.numberSlots = numberSlots;
+ this.numberAvailableSlots = numberAvailableSlots;
+ this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TaskManagerInfo that = (TaskManagerInfo) o;
+ return dataPort == that.dataPort &&
+ lastHeartbeat == that.lastHeartbeat &&
+ numberSlots == that.numberSlots &&
+ numberAvailableSlots == that.numberAvailableSlots &&
+ Objects.equals(instanceId, that.instanceId) &&
+ Objects.equals(address, that.address) &&
+ Objects.equals(hardwareDescription, that.hardwareDescription);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ instanceId,
+ address,
+ dataPort,
+ lastHeartbeat,
+ numberSlots,
+ numberAvailableSlots,
+ hardwareDescription);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersHeaders.java
new file mode 100644
index 0000000..bc0f82e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link TaskManagersHandler}.
+ */
+public class TaskManagersHeaders implements MessageHeaders<EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
+
+ private static final TaskManagersHeaders INSTANCE = new TaskManagersHeaders();
+
+ public static final String URL = "/taskmanagers";
+
+ private TaskManagersHeaders() {}
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<TaskManagersInfo> getResponseClass() {
+ return TaskManagersInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public EmptyMessageParameters getUnresolvedMessageParameters() {
+ return EmptyMessageParameters.getInstance();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static TaskManagersHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersInfo.java
new file mode 100644
index 0000000..2149912
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagersInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Class containing a collection of {@link TaskManagerInfo}.
+ */
+public class TaskManagersInfo implements ResponseBody {
+
+ public static final String FIELD_NAME_TASK_MANAGERS = "taskmanagers";
+
+ @JsonProperty(FIELD_NAME_TASK_MANAGERS)
+ private final Collection<TaskManagerInfo> taskManagerInfos;
+
+ @JsonCreator
+ public TaskManagersInfo(
+ @JsonProperty(FIELD_NAME_TASK_MANAGERS) Collection<TaskManagerInfo> taskManagerInfos) {
+ this.taskManagerInfos = Preconditions.checkNotNull(taskManagerInfos);
+ }
+
+ public Collection<TaskManagerInfo> getTaskManagerInfos() {
+ return taskManagerInfos;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TaskManagersInfo that = (TaskManagersInfo) o;
+ return Objects.equals(taskManagerInfos, that.taskManagerInfos);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(taskManagerInfos);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDDeserializer.java
new file mode 100644
index 0000000..1c53dcd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDDeserializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+/**
+ * Json deserializer for {@link InstanceID}.
+ */
+public class InstanceIDDeserializer extends StdDeserializer<InstanceID> {
+
+ private static final long serialVersionUID = -9058463293913469849L;
+
+ protected InstanceIDDeserializer() {
+ super(InstanceID.class);
+ }
+
+ @Override
+ public InstanceID deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+ return new InstanceID(StringUtils.hexStringToByte(p.getValueAsString()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDSerializer.java
new file mode 100644
index 0000000..f3c0dc5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/InstanceIDSerializer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.instance.InstanceID;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * Json serializer for {@link InstanceID}.
+ */
+public class InstanceIDSerializer extends StdSerializer<InstanceID> {
+
+ private static final long serialVersionUID = 5798852092159615938L;
+
+ protected InstanceIDSerializer() {
+ super(InstanceID.class);
+ }
+
+ @Override
+ public void serialize(InstanceID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+ gen.writeString(value.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index c48d188..a348948 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
@@ -166,6 +167,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
// ------------------------------------------------------------------------
+ private final HardwareDescription hardwareDescription;
+
public TaskExecutor(
RpcService rpcService,
TaskManagerConfiguration taskManagerConfiguration,
@@ -214,6 +217,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
new ResourceManagerHeartbeatListener(),
rpcService.getScheduledExecutor(),
log);
+
+ hardwareDescription = HardwareDescription.extractFromSystem(memoryManager.getMemorySize());
}
// ------------------------------------------------------------------------
@@ -720,6 +725,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
getAddress(),
getResourceID(),
taskSlotTable.createSlotReport(getResourceID()),
+ taskManagerLocation.dataPort(),
+ hardwareDescription,
newLeaderAddress,
newResourceManagerId,
getMainThreadExecutor(),
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index c3d3532..1288009 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationConnectionListener;
@@ -51,6 +52,10 @@ public class TaskExecutorToResourceManagerConnection
private final SlotReport slotReport;
+ private final int dataPort;
+
+ private final HardwareDescription hardwareDescription;
+
private final RegistrationConnectionListener<TaskExecutorRegistrationSuccess> registrationListener;
private InstanceID registrationId;
@@ -63,6 +68,8 @@ public class TaskExecutorToResourceManagerConnection
String taskManagerAddress,
ResourceID taskManagerResourceId,
SlotReport slotReport,
+ int dataPort,
+ HardwareDescription hardwareDescription,
String resourceManagerAddress,
ResourceManagerId resourceManagerId,
Executor executor,
@@ -74,6 +81,8 @@ public class TaskExecutorToResourceManagerConnection
this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress);
this.taskManagerResourceId = Preconditions.checkNotNull(taskManagerResourceId);
this.slotReport = Preconditions.checkNotNull(slotReport);
+ this.dataPort = dataPort;
+ this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
this.registrationListener = Preconditions.checkNotNull(registrationListener);
}
@@ -87,7 +96,9 @@ public class TaskExecutorToResourceManagerConnection
getTargetLeaderId(),
taskManagerAddress,
taskManagerResourceId,
- slotReport);
+ slotReport,
+ dataPort,
+ hardwareDescription);
}
@Override
@@ -135,6 +146,10 @@ public class TaskExecutorToResourceManagerConnection
private final SlotReport slotReport;
+ private final int dataPort;
+
+ private final HardwareDescription hardwareDescription;
+
ResourceManagerRegistration(
Logger log,
RpcService rpcService,
@@ -142,12 +157,16 @@ public class TaskExecutorToResourceManagerConnection
ResourceManagerId resourceManagerId,
String taskExecutorAddress,
ResourceID resourceID,
- SlotReport slotReport) {
+ SlotReport slotReport,
+ int dataPort,
+ HardwareDescription hardwareDescription) {
super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, resourceManagerId);
this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
this.resourceID = checkNotNull(resourceID);
this.slotReport = checkNotNull(slotReport);
+ this.dataPort = dataPort;
+ this.hardwareDescription = checkNotNull(hardwareDescription);
}
@Override
@@ -155,7 +174,13 @@ public class TaskExecutorToResourceManagerConnection
ResourceManagerGateway resourceManager, ResourceManagerId fencingToken, long timeoutMillis) throws Exception {
Time timeout = Time.milliseconds(timeoutMillis);
- return resourceManager.registerTaskExecutor(taskExecutorAddress, resourceID, slotReport, timeout);
+ return resourceManager.registerTaskExecutor(
+ taskExecutorAddress,
+ resourceID,
+ slotReport,
+ dataPort,
+ hardwareDescription,
+ timeout);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index d8e65a6..2b58b6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
@@ -481,6 +482,8 @@ public class ResourceManagerTest extends TestLogger {
@Test
public void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
+ final int dataPort = 1234;
+ final HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
final String taskManagerAddress = "tm";
final ResourceID taskManagerResourceID = new ResourceID(taskManagerAddress);
final ResourceID resourceManagerResourceID = ResourceID.generate();
@@ -537,6 +540,8 @@ public class ResourceManagerTest extends TestLogger {
taskManagerAddress,
taskManagerResourceID,
slotReport,
+ dataPort,
+ hardwareDescription,
timeout);
RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
assertTrue(response instanceof TaskExecutorRegistrationSuccess);
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 147d180..524c503 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
@@ -60,6 +61,10 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
private SlotReport slotReport = new SlotReport();
+ private int dataPort = 1234;
+
+ private HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
+
private static String taskExecutorAddress = "/taskExecutor1";
private ResourceID taskExecutorResourceID;
@@ -103,13 +108,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
try {
// test response successful
CompletableFuture<RegistrationResponse> successfulFuture =
- rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, timeout);
+ rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, dataPort, hardwareDescription, timeout);
RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
assertTrue(response instanceof TaskExecutorRegistrationSuccess);
// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
CompletableFuture<RegistrationResponse> duplicateFuture =
- rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, timeout);
+ rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, dataPort, hardwareDescription, timeout);
RegistrationResponse duplicateResponse = duplicateFuture.get();
assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
@@ -128,7 +133,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
try {
// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
- wronglyFencedGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, timeout);
+ wronglyFencedGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, dataPort, hardwareDescription, timeout);
try {
unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
@@ -152,7 +157,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
// test throw exception when receive a registration from taskExecutor which takes invalid address
String invalidAddress = "/taskExecutor2";
CompletableFuture<RegistrationResponse> invalidAddressFuture =
- rmGateway.registerTaskExecutor(invalidAddress, taskExecutorResourceID, slotReport, timeout);
+ rmGateway.registerTaskExecutor(invalidAddress, taskExecutorResourceID, slotReport, dataPort, hardwareDescription, timeout);
assertTrue(invalidAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline);
} finally {
if (testingFatalErrorHandler.hasExceptionOccurred()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagerInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagerInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagerInfoTest.java
new file mode 100644
index 0000000..371e818
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagerInfoTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Test for (un)marshalling of the {@link TaskManagerInfo}.
+ */
+public class TaskManagerInfoTest extends RestResponseMarshallingTestBase<TaskManagerInfo> {
+
+ private static final Random random = new Random();
+
+ @Override
+ protected Class<TaskManagerInfo> getTestResponseClass() {
+ return TaskManagerInfo.class;
+ }
+
+ @Override
+ protected TaskManagerInfo getTestResponseInstance() throws Exception {
+ return createRandomTaskManagerInfo();
+ }
+
+ static TaskManagerInfo createRandomTaskManagerInfo() {
+ return new TaskManagerInfo(
+ new InstanceID(),
+ UUID.randomUUID().toString(),
+ random.nextInt(),
+ random.nextLong(),
+ random.nextInt(),
+ random.nextInt(),
+ new HardwareDescription(
+ random.nextInt(),
+ random.nextLong(),
+ random.nextLong(),
+ random.nextLong()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagersInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagersInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagersInfoTest.java
new file mode 100644
index 0000000..1f53674
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/TaskManagersInfoTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Arrays;
+
+import static org.apache.flink.runtime.rest.messages.TaskManagerInfoTest.createRandomTaskManagerInfo;
+
+/**
+ * Test for (un)marshalling of {@link TaskManagersInfo}.
+ */
+public class TaskManagersInfoTest extends RestResponseMarshallingTestBase<TaskManagersInfo> {
+
+ @Override
+ protected Class<TaskManagersInfo> getTestResponseClass() {
+ return TaskManagersInfo.class;
+ }
+
+ @Override
+ protected TaskManagersInfo getTestResponseInstance() throws Exception {
+ final TaskManagerInfo taskManagerInfo1 = createRandomTaskManagerInfo();
+ final TaskManagerInfo taskManagerInfo2 = createRandomTaskManagerInfo();
+
+ return new TaskManagersInfo(Arrays.asList(taskManagerInfo1, taskManagerInfo2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/def87816/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index de807a6..776bdf9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -260,7 +261,7 @@ public class TaskExecutorTest extends TestLogger {
// register the mock resource manager gateway
ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
when(rmGateway.registerTaskExecutor(
- anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+ anyString(), any(ResourceID.class), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
.thenReturn(
CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(
@@ -334,7 +335,7 @@ public class TaskExecutorTest extends TestLogger {
// register resource manager success will trigger monitoring heartbeat target between tm and rm
verify(rmGateway, Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).registerTaskExecutor(
- eq(taskManager.getAddress()), eq(tmResourceId), any(SlotReport.class), any(Time.class));
+ eq(taskManager.getAddress()), eq(tmResourceId), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class));
// heartbeat timeout should trigger disconnect TaskManager from ResourceManager
verify(rmGateway, timeout(heartbeatTimeout * 50L)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class));
@@ -363,7 +364,7 @@ public class TaskExecutorTest extends TestLogger {
// register the mock resource manager gateway
ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
when(rmGateway.registerTaskExecutor(
- anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+ anyString(), any(ResourceID.class), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
.thenReturn(
CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(
@@ -454,7 +455,7 @@ public class TaskExecutorTest extends TestLogger {
// register resource manager success will trigger monitoring heartbeat target between tm and rm
verify(rmGateway, timeout(verificationTimeout).atLeast(1)).registerTaskExecutor(
- eq(taskManager.getAddress()), eq(tmResourceId), eq(slotReport1), any(Time.class));
+ eq(taskManager.getAddress()), eq(tmResourceId), eq(slotReport1), anyInt(), any(HardwareDescription.class), any(Time.class));
verify(heartbeatManager, timeout(verificationTimeout)).monitorTarget(any(ResourceID.class), any(HeartbeatTarget.class));
@@ -495,7 +496,7 @@ public class TaskExecutorTest extends TestLogger {
// register a mock resource manager gateway
ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
when(rmGateway.registerTaskExecutor(
- anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+ anyString(), any(ResourceID.class), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
new InstanceID(), resourceManagerResourceId, 10L)));
@@ -540,7 +541,7 @@ public class TaskExecutorTest extends TestLogger {
String taskManagerAddress = taskManager.getAddress();
verify(rmGateway, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
- eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
+ eq(taskManagerAddress), eq(resourceID), eq(slotReport), anyInt(), any(HardwareDescription.class), any(Time.class));
// check if a concurrent error occurred
testingFatalErrorHandler.rethrowError();
@@ -567,11 +568,11 @@ public class TaskExecutorTest extends TestLogger {
ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);
when(rmGateway1.registerTaskExecutor(
- anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+ anyString(), any(ResourceID.class), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, 10L)));
when(rmGateway2.registerTaskExecutor(
- anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+ anyString(), any(ResourceID.class), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, 10L)));
@@ -628,7 +629,7 @@ public class TaskExecutorTest extends TestLogger {
testLeaderService.notifyListener(address1, leaderId1);
verify(rmGateway1, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
- eq(taskManagerAddress), eq(tmResourceID), any(SlotReport.class), any(Time.class));
+ eq(taskManagerAddress), eq(tmResourceID), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());
// cancel the leader
@@ -638,7 +639,7 @@ public class TaskExecutorTest extends TestLogger {
testLeaderService.notifyListener(address2, leaderId2);
verify(rmGateway2, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
- eq(taskManagerAddress), eq(tmResourceID), eq(slotReport), any(Time.class));
+ eq(taskManagerAddress), eq(tmResourceID), eq(slotReport), anyInt(), any(HardwareDescription.class), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());
// check if a concurrent error occurred
@@ -829,6 +830,8 @@ public class TaskExecutorTest extends TestLogger {
any(String.class),
eq(resourceId),
any(SlotReport.class),
+ anyInt(),
+ any(HardwareDescription.class),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
final String jobManagerAddress = "jm";
@@ -947,6 +950,8 @@ public class TaskExecutorTest extends TestLogger {
any(String.class),
eq(resourceId),
any(SlotReport.class),
+ anyInt(),
+ any(HardwareDescription.class),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
@@ -998,6 +1003,8 @@ public class TaskExecutorTest extends TestLogger {
eq(taskManager.getAddress()),
eq(resourceId),
any(SlotReport.class),
+ anyInt(),
+ any(HardwareDescription.class),
any(Time.class));
taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds(10000L));
@@ -1062,7 +1069,7 @@ public class TaskExecutorTest extends TestLogger {
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(new SlotReport());
when(taskSlotTable.getCurrentAllocation(1)).thenReturn(new AllocationID());
- when(rmGateway1.registerTaskExecutor(anyString(), eq(resourceID), any(SlotReport.class), any(Time.class))).thenReturn(
+ when(rmGateway1.registerTaskExecutor(anyString(), eq(resourceID), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class))).thenReturn(
CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), 1000L)));
TaskExecutor taskManager = new TaskExecutor(
@@ -1096,7 +1103,7 @@ public class TaskExecutorTest extends TestLogger {
testLeaderService.notifyListener(address1, resourceManagerId.toUUID());
verify(rmGateway1, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
- eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
+ eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());
// test that allocating a slot works
@@ -1125,7 +1132,7 @@ public class TaskExecutorTest extends TestLogger {
// re-register
verify(rmGateway1, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
- eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
+ eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class));
testLeaderService.notifyListener(address1, resourceManagerId.toUUID());
// now we should be successful because the slots status has been synced
@@ -1185,6 +1192,8 @@ public class TaskExecutorTest extends TestLogger {
any(String.class),
eq(resourceId),
any(SlotReport.class),
+ anyInt(),
+ any(HardwareDescription.class),
any(Time.class))).thenReturn(
CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));