You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/28 08:21:15 UTC
[32/50] [abbrv] flink git commit: [FLINK-4538][FLINK-4348]
ResourceManager slot allocation protcol
[FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol
- associates JobMasters with JobID instead of InstanceID
- adds TaskExecutorGateway to slot
- adds SlotManager as RM constructor parameter
- adds LeaderRetrievalListener to SlotManager to keep track of the leader id
- tests the interaction JM->RM requestSlot
- tests the interaction RM->TM requestSlot
This closes #2463
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53287755
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53287755
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53287755
Branch: refs/heads/flip-6
Commit: 532877557f92f7f1809f5440a642ba090c337f5b
Parents: f9e2dc0
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Sep 1 16:53:31 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Sep 27 19:24:59 2016 +0200
----------------------------------------------------------------------
.../clusterframework/types/ResourceProfile.java | 2 +-
.../clusterframework/types/ResourceSlot.java | 14 +-
.../resourcemanager/JobMasterRegistration.java | 10 +-
.../resourcemanager/RegistrationResponse.java | 9 +-
.../resourcemanager/ResourceManager.java | 167 +++---
.../resourcemanager/ResourceManagerGateway.java | 2 +-
.../runtime/resourcemanager/SlotAssignment.java | 25 -
.../runtime/resourcemanager/SlotManager.java | 523 -----------------
.../resourcemanager/SlotRequestRegistered.java | 33 ++
.../resourcemanager/SlotRequestRejected.java | 34 ++
.../resourcemanager/SlotRequestReply.java | 41 ++
.../slotmanager/SimpleSlotManager.java | 59 ++
.../slotmanager/SlotManager.java | 579 +++++++++++++++++++
.../flink/runtime/taskexecutor/SlotStatus.java | 5 +-
.../taskexecutor/TaskExecutorGateway.java | 17 +
.../resourcemanager/ResourceManagerHATest.java | 4 +-
.../resourcemanager/SlotManagerTest.java | 538 -----------------
.../slotmanager/SlotManagerTest.java | 554 ++++++++++++++++++
.../slotmanager/SlotProtocolTest.java | 225 +++++++
.../flink/runtime/rpc/TestingRpcService.java | 6 +-
.../runtime/rpc/TestingSerialRpcService.java | 4 +
21 files changed, 1677 insertions(+), 1174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index ff1c4bf..fa3aabc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -68,6 +68,6 @@ public class ResourceProfile implements Serializable {
* @return true if the requirement is matched, otherwise false
*/
public boolean isMatching(ResourceProfile required) {
- return Double.compare(cpuCores, required.getCpuCores()) >= 0 && memoryInMB >= required.getMemoryInMB();
+ return cpuCores >= required.getCpuCores() && memoryInMB >= required.getMemoryInMB();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
index 8a6db5f..5fb8aee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.clusterframework.types;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
import java.io.Serializable;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -26,7 +28,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* A ResourceSlot represents a slot located in TaskManager from ResourceManager's view. It has a unique
* identification and resource profile which we can compare to the resource request.
*/
-public class ResourceSlot implements ResourceIDRetrievable, Serializable {
+public class ResourceSlot implements ResourceIDRetrievable {
private static final long serialVersionUID = -5853720153136840674L;
@@ -36,9 +38,13 @@ public class ResourceSlot implements ResourceIDRetrievable, Serializable {
/** The resource profile of this slot */
private final ResourceProfile resourceProfile;
- public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile) {
+ /** Gateway to the TaskExecutor which owns the slot */
+ private final TaskExecutorGateway taskExecutorGateway;
+
+ public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorGateway taskExecutorGateway) {
this.slotId = checkNotNull(slotId);
this.resourceProfile = checkNotNull(resourceProfile);
+ this.taskExecutorGateway = taskExecutorGateway;
}
@Override
@@ -54,6 +60,10 @@ public class ResourceSlot implements ResourceIDRetrievable, Serializable {
return resourceProfile;
}
+ public TaskExecutorGateway getTaskExecutorGateway() {
+ return taskExecutorGateway;
+ }
+
/**
* Check whether required resource profile can be matched by this slot.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
index 309dcc1..439e56b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
@@ -18,18 +18,26 @@
package org.apache.flink.runtime.resourcemanager;
+import org.apache.flink.api.common.JobID;
+
import java.io.Serializable;
public class JobMasterRegistration implements Serializable {
private static final long serialVersionUID = 8411214999193765202L;
private final String address;
+ private final JobID jobID;
- public JobMasterRegistration(String address) {
+ public JobMasterRegistration(String address, JobID jobID) {
this.address = address;
+ this.jobID = jobID;
}
public String getAddress() {
return address;
}
+
+ public JobID getJobID() {
+ return jobID;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
index fb6c401..796e634 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
@@ -18,26 +18,19 @@
package org.apache.flink.runtime.resourcemanager;
-import org.apache.flink.runtime.instance.InstanceID;
-
import java.io.Serializable;
public class RegistrationResponse implements Serializable {
private static final long serialVersionUID = -2379003255993119993L;
private final boolean isSuccess;
- private final InstanceID instanceID;
- public RegistrationResponse(boolean isSuccess, InstanceID instanceID) {
+ public RegistrationResponse(boolean isSuccess) {
this.isSuccess = isSuccess;
- this.instanceID = instanceID;
}
public boolean isSuccess() {
return isSuccess;
}
- public InstanceID getInstanceID() {
- return instanceID;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 44c022b..29aba1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -21,11 +21,13 @@ package org.apache.flink.runtime.resourcemanager;
import akka.dispatch.Mapper;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
@@ -33,6 +35,8 @@ import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import java.util.HashMap;
@@ -51,16 +55,28 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
* </ul>
*/
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
- private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ private final Map<JobID, JobMasterGateway> jobMasterGateways;
+
private final HighAvailabilityServices highAvailabilityServices;
- private LeaderElectionService leaderElectionService = null;
- private UUID leaderSessionID = null;
- public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) {
+ private LeaderElectionService leaderElectionService;
+
+ private final SlotManager slotManager;
+
+ private UUID leaderSessionID;
+
+ public ResourceManager(
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ SlotManager slotManager) {
super(rpcService);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
this.jobMasterGateways = new HashMap<>();
+ this.slotManager = slotManager;
}
@Override
@@ -69,7 +85,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
try {
super.start();
leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
- leaderElectionService.start(new ResourceManagerLeaderContender());
+ leaderElectionService.start(this);
} catch (Throwable e) {
log.error("A fatal error happened when starting the ResourceManager", e);
throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
@@ -94,7 +110,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
*/
@VisibleForTesting
UUID getLeaderSessionID() {
- return leaderSessionID;
+ return this.leaderSessionID;
}
/**
@@ -105,21 +121,20 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
*/
@RpcMethod
public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
- Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
+ final Future<JobMasterGateway> jobMasterFuture =
+ getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
+ final JobID jobID = jobMasterRegistration.getJobID();
return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() {
@Override
public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
- InstanceID instanceID;
- if (jobMasterGateways.containsKey(jobMasterGateway)) {
- instanceID = jobMasterGateways.get(jobMasterGateway);
- } else {
- instanceID = new InstanceID();
- jobMasterGateways.put(jobMasterGateway, instanceID);
+ final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
+ if (existingGateway != null) {
+ LOG.info("Replacing existing gateway {} for JobID {} with {}.",
+ existingGateway, jobID, jobMasterGateway);
}
-
- return new RegistrationResponse(true, instanceID);
+ return new RegistrationResponse(true);
}
}, getMainThreadExecutionContext());
}
@@ -131,9 +146,16 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
* @return Slot assignment
*/
@RpcMethod
- public SlotAssignment requestSlot(SlotRequest slotRequest) {
- System.out.println("SlotRequest: " + slotRequest);
- return new SlotAssignment();
+ public SlotRequestReply requestSlot(SlotRequest slotRequest) {
+ final JobID jobId = slotRequest.getJobId();
+ final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
+
+ if (jobMasterGateway != null) {
+ return slotManager.requestSlot(slotRequest);
+ } else {
+ LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
+ return new SlotRequestRejected(slotRequest.getAllocationId());
+ }
}
@@ -154,61 +176,62 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
}
- private class ResourceManagerLeaderContender implements LeaderContender {
-
- /**
- * Callback method when current resourceManager is granted leadership
- *
- * @param leaderSessionID unique leadershipID
- */
- @Override
- public void grantLeadership(final UUID leaderSessionID) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
- ResourceManager.this.leaderSessionID = leaderSessionID;
- // confirming the leader session ID might be blocking,
- leaderElectionService.confirmLeaderSessionID(leaderSessionID);
- }
- });
- }
- /**
- * Callback method when current resourceManager lose leadership.
- */
- @Override
- public void revokeLeadership() {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.info("ResourceManager {} was revoked leadership.", getAddress());
- jobMasterGateways.clear();
- leaderSessionID = null;
- }
- });
- }
+ // ------------------------------------------------------------------------
+ // Leader Contender
+ // ------------------------------------------------------------------------
- @Override
- public String getAddress() {
- return ResourceManager.this.getAddress();
- }
+ /**
+ * Callback method when current resourceManager is granted leadership
+ *
+ * @param leaderSessionID unique leadershipID
+ */
+ @Override
+ public void grantLeadership(final UUID leaderSessionID) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
+ // confirming the leader session ID might be blocking,
+ leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+ // notify SlotManager
+ slotManager.notifyLeaderAddress(getAddress(), leaderSessionID);
+ ResourceManager.this.leaderSessionID = leaderSessionID;
+ }
+ });
+ }
- /**
- * Handles error occurring in the leader election service
- *
- * @param exception Exception being thrown in the leader election service
- */
- @Override
- public void handleError(final Exception exception) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.error("ResourceManager received an error from the LeaderElectionService.", exception);
- // terminate ResourceManager in case of an error
- shutDown();
- }
- });
- }
+ /**
+ * Callback method when current resourceManager lose leadership.
+ */
+ @Override
+ public void revokeLeadership() {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("ResourceManager {} was revoked leadership.", getAddress());
+ jobMasterGateways.clear();
+ ResourceManager.this.leaderSessionID = null;
+ }
+ });
+ }
+
+ /**
+ * Handles error occurring in the leader election service
+ *
+ * @param exception Exception being thrown in the leader election service
+ */
+ @Override
+ public void handleError(final Exception exception) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.error("ResourceManager received an error from the LeaderElectionService.", exception);
+ // notify SlotManager
+ slotManager.handleError(exception);
+ // terminate ResourceManager in case of an error
+ shutDown();
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index b5782b0..e5c8b64 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -58,7 +58,7 @@ public interface ResourceManagerGateway extends RpcGateway {
* @param slotRequest Slot request
* @return Future slot assignment
*/
- Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
+ Future<SlotRequestRegistered> requestSlot(SlotRequest slotRequest);
/**
*
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
deleted file mode 100644
index 695204d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import java.io.Serializable;
-
-public class SlotAssignment implements Serializable{
- private static final long serialVersionUID = -6990813455942742322L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
deleted file mode 100644
index 5c06648..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
+++ /dev/null
@@ -1,523 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.taskexecutor.SlotReport;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request
- * slots from registered TaskManagers and issues container allocation requests in case of there are not
- * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat.
- * <p>
- * The main operation principle of SlotManager is:
- * <ul>
- * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li>
- * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li>
- * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be
- * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should
- * be handled outside SlotManager. SlotManager will make each decision based on the information it currently
- * holds.</li>
- * </ul>
- * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
- */
-public abstract class SlotManager {
-
- private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
-
- /** Gateway to communicate with ResourceManager */
- private final ResourceManagerGateway resourceManagerGateway;
-
- /** All registered slots, including free and allocated slots */
- private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots;
-
- /** All pending slot requests, waiting available slots to fulfil */
- private final Map<AllocationID, SlotRequest> pendingSlotRequests;
-
- /** All free slots that can be used to be allocated */
- private final Map<SlotID, ResourceSlot> freeSlots;
-
- /** All allocations, we can lookup allocations either by SlotID or AllocationID */
- private final AllocationMap allocationMap;
-
- public SlotManager(ResourceManagerGateway resourceManagerGateway) {
- this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
- this.registeredSlots = new HashMap<>(16);
- this.pendingSlotRequests = new LinkedHashMap<>(16);
- this.freeSlots = new HashMap<>(16);
- this.allocationMap = new AllocationMap();
- }
-
- // ------------------------------------------------------------------------
- // slot managements
- // ------------------------------------------------------------------------
-
- /**
- * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container
- * allocation if we don't have enough resource. If we have free slot which can match the request, record
- * this allocation and forward the request to TaskManager through ResourceManager (we want this done by
- * RPC's main thread to avoid race condition).
- *
- * @param request The detailed request of the slot
- */
- public void requestSlot(final SlotRequest request) {
- if (isRequestDuplicated(request)) {
- LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId());
- return;
- }
-
- // try to fulfil the request with current free slots
- ResourceSlot slot = chooseSlotToUse(request, freeSlots);
- if (slot != null) {
- LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(),
- request.getAllocationId(), request.getJobId());
-
- // record this allocation in bookkeeping
- allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId());
-
- // remove selected slot from free pool
- freeSlots.remove(slot.getSlotId());
-
- // TODO: send slot request to TaskManager
- } else {
- LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
- "AllocationID:{}, JobID:{}", request.getAllocationId(), request.getJobId());
- allocateContainer(request.getResourceProfile());
- pendingSlotRequests.put(request.getAllocationId(), request);
- }
- }
-
- /**
- * Sync slot status with TaskManager's SlotReport.
- */
- public void updateSlotStatus(final SlotReport slotReport) {
- for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
- updateSlotStatus(slotStatus);
- }
- }
-
- /**
- * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.)
- * or really rejected by TaskManager. We shall retry this request by:
- * <ul>
- * <li>1. verify and clear all the previous allocate information for this request
- * <li>2. try to request slot again
- * </ul>
- * <p>
- * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response
- * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request,
- * but it can be taken care of by rejecting registration at JobManager.
- *
- * @param originalRequest The original slot request
- * @param slotId The target SlotID
- */
- public void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) {
- final AllocationID originalAllocationId = originalRequest.getAllocationId();
- LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}",
- slotId, originalAllocationId, originalRequest.getJobId());
-
- // verify the allocation info before we do anything
- if (freeSlots.containsKey(slotId)) {
- // this slot is currently empty, no need to de-allocate it from our allocations
- LOG.info("Original slot is somehow empty, retrying this request");
-
- // before retry, we should double check whether this request was allocated by some other ways
- if (!allocationMap.isAllocated(originalAllocationId)) {
- requestSlot(originalRequest);
- } else {
- LOG.info("The failed request has somehow been allocated, SlotID:{}",
- allocationMap.getSlotID(originalAllocationId));
- }
- } else if (allocationMap.isAllocated(slotId)) {
- final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
-
- // check whether we have an agreement on whom this slot belongs to
- if (originalAllocationId.equals(currentAllocationId)) {
- LOG.info("De-allocate this request and retry");
- allocationMap.removeAllocation(currentAllocationId);
-
- // put this slot back to free pool
- ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId));
- freeSlots.put(slotId, slot);
-
- // retry the request
- requestSlot(originalRequest);
- } else {
- // the slot is taken by someone else, no need to de-allocate it from our allocations
- LOG.info("Original slot is taken by someone else, current AllocationID:{}", currentAllocationId);
-
- // before retry, we should double check whether this request was allocated by some other ways
- if (!allocationMap.isAllocated(originalAllocationId)) {
- requestSlot(originalRequest);
- } else {
- LOG.info("The failed request is somehow been allocated, SlotID:{}",
- allocationMap.getSlotID(originalAllocationId));
- }
- }
- } else {
- LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
- }
- }
-
- /**
- * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean up all its slots.
- *
- * @param resourceId The ResourceID of the TaskManager
- */
- public void notifyTaskManagerFailure(final ResourceID resourceId) {
- LOG.info("Resource:{} been notified failure", resourceId);
- final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId);
- if (slotIdsToRemove != null) {
- for (SlotID slotId : slotIdsToRemove.keySet()) {
- LOG.info("Removing Slot:{} upon resource failure", slotId);
- if (freeSlots.containsKey(slotId)) {
- freeSlots.remove(slotId);
- } else if (allocationMap.isAllocated(slotId)) {
- allocationMap.removeAllocation(slotId);
- } else {
- LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
- }
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // internal behaviors
- // ------------------------------------------------------------------------
-
- /**
- * Update slot status based on TaskManager's report. There are mainly two situations when we receive the report:
- * <ul>
- * <li>1. The slot is newly registered.</li>
- * <li>2. The slot has registered, it contains its current status.</li>
- * </ul>
- * <p>
- * Regarding 1: It's fairly simple, we just record this slot's status, and trigger schedule if slot is empty.
- * <p>
- * Regarding 2: It will cause some weird situation since we may have some time-gap on how the slot's status really
- * is. We may have some updates on the slot's allocation, but it doesn't reflected by TaskManager's heartbeat yet,
- * and we may make some wrong decision if we cannot guarantee we have the exact status about all the slots. So
- * the principle here is: We always trust TaskManager's heartbeat, we will correct our information based on that
- * and take next action based on the diff between our information and heartbeat status.
- *
- * @param reportedStatus Reported slot status
- */
- void updateSlotStatus(final SlotStatus reportedStatus) {
- final SlotID slotId = reportedStatus.getSlotID();
- final ResourceSlot slot = new ResourceSlot(slotId, reportedStatus.getProfiler());
-
- if (registerNewSlot(slot)) {
- // we have a newly registered slot
- LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, reportedStatus.getAllocationID());
-
- if (reportedStatus.getAllocationID() != null) {
- // slot in use, record this in bookkeeping
- allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
- } else {
- handleFreeSlot(new ResourceSlot(slotId, reportedStatus.getProfiler()));
- }
- } else {
- // slot exists, update current information
- if (reportedStatus.getAllocationID() != null) {
- // slot is reported in use
- final AllocationID reportedAllocationId = reportedStatus.getAllocationID();
-
- // check whether we also thought this slot is in use
- if (allocationMap.isAllocated(slotId)) {
- // we also think that slot is in use, check whether the AllocationID matches
- final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
-
- if (!reportedAllocationId.equals(currentAllocationId)) {
- LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:{}",
- slotId, currentAllocationId, reportedAllocationId);
-
- // seems we have a disagreement about the slot assignments, need to correct it
- allocationMap.removeAllocation(slotId);
- allocationMap.addAllocation(slotId, reportedAllocationId);
- }
- } else {
- LOG.info("Slot allocation info mismatch! SlotID:{}, current:null, reported:{}",
- slotId, reportedAllocationId);
-
- // we thought the slot is free, should correct this information
- allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
-
- // remove this slot from free slots pool
- freeSlots.remove(slotId);
- }
- } else {
- // slot is reported empty
-
- // check whether we also thought this slot is empty
- if (allocationMap.isAllocated(slotId)) {
- LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null",
- slotId, allocationMap.getAllocationID(slotId));
-
- // we thought the slot is in use, correct it
- allocationMap.removeAllocation(slotId);
-
- // we have a free slot!
- handleFreeSlot(new ResourceSlot(slotId, reportedStatus.getProfiler()));
- }
- }
- }
- }
-
- /**
- * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled,
- * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot
- * to the free pool.
- *
- * @param freeSlot The free slot
- */
- private void handleFreeSlot(final ResourceSlot freeSlot) {
- SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests);
-
- if (chosenRequest != null) {
- pendingSlotRequests.remove(chosenRequest.getAllocationId());
-
- LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(),
- chosenRequest.getAllocationId(), chosenRequest.getJobId());
- allocationMap.addAllocation(freeSlot.getSlotId(), chosenRequest.getAllocationId());
-
- // TODO: send slot request to TaskManager
- } else {
- freeSlots.put(freeSlot.getSlotId(), freeSlot);
- }
- }
-
- /**
- * Check whether the request is duplicated. We use AllocationID to identify slot request, for each
- * formerly received slot request, it is either in pending list or already been allocated.
- *
- * @param request The slot request
- * @return <tt>true</tt> if the request is duplicated
- */
- private boolean isRequestDuplicated(final SlotRequest request) {
- final AllocationID allocationId = request.getAllocationId();
- return pendingSlotRequests.containsKey(allocationId)
- || allocationMap.isAllocated(allocationId);
- }
-
- /**
- * Try to register slot, and tell if this slot is newly registered.
- *
- * @param slot The ResourceSlot which will be checked and registered
- * @return <tt>true</tt> if we meet a new slot
- */
- private boolean registerNewSlot(final ResourceSlot slot) {
- final SlotID slotId = slot.getSlotId();
- final ResourceID resourceId = slotId.getResourceID();
- if (!registeredSlots.containsKey(resourceId)) {
- registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
- }
- return registeredSlots.get(resourceId).put(slotId, slot) == null;
- }
-
- private ResourceSlot getRegisteredSlot(final SlotID slotId) {
- final ResourceID resourceId = slotId.getResourceID();
- if (!registeredSlots.containsKey(resourceId)) {
- return null;
- }
- return registeredSlots.get(resourceId).get(slotId);
- }
-
- // ------------------------------------------------------------------------
- // Framework specific behavior
- // ------------------------------------------------------------------------
-
- /**
- * Choose a slot to use among all free slots, the behavior is framework specified.
- *
- * @param request The slot request
- * @param freeSlots All slots which can be used
- * @return The slot we choose to use, <tt>null</tt> if we did not find a match
- */
- protected abstract ResourceSlot chooseSlotToUse(final SlotRequest request,
- final Map<SlotID, ResourceSlot> freeSlots);
-
- /**
- * Choose a pending request to fulfill when we have a free slot, the behavior is framework specified.
- *
- * @param offeredSlot The free slot
- * @param pendingRequests All the pending slot requests
- * @return The chosen SlotRequest, <tt>null</tt> if we did not find a match
- */
- protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot,
- final Map<AllocationID, SlotRequest> pendingRequests);
-
- /**
- * The framework specific code for allocating a container for specified resource profile.
- *
- * @param resourceProfile The resource profile
- */
- protected abstract void allocateContainer(final ResourceProfile resourceProfile);
-
-
- // ------------------------------------------------------------------------
- // Helper classes
- // ------------------------------------------------------------------------
-
- /**
- * We maintain all the allocations with SlotID and AllocationID. We are able to get or remove the allocation info
- * either by SlotID or AllocationID.
- */
- private static class AllocationMap {
-
- /** All allocated slots (by SlotID) */
- private final Map<SlotID, AllocationID> allocatedSlots;
-
- /** All allocated slots (by AllocationID), it'a a inverse view of allocatedSlots */
- private final Map<AllocationID, SlotID> allocatedSlotsByAllocationId;
-
- AllocationMap() {
- this.allocatedSlots = new HashMap<>(16);
- this.allocatedSlotsByAllocationId = new HashMap<>(16);
- }
-
- /**
- * Add a allocation
- *
- * @param slotId The slot id
- * @param allocationId The allocation id
- */
- void addAllocation(final SlotID slotId, final AllocationID allocationId) {
- allocatedSlots.put(slotId, allocationId);
- allocatedSlotsByAllocationId.put(allocationId, slotId);
- }
-
- /**
- * De-allocation with slot id
- *
- * @param slotId The slot id
- */
- void removeAllocation(final SlotID slotId) {
- if (allocatedSlots.containsKey(slotId)) {
- final AllocationID allocationId = allocatedSlots.get(slotId);
- allocatedSlots.remove(slotId);
- allocatedSlotsByAllocationId.remove(allocationId);
- }
- }
-
- /**
- * De-allocation with allocation id
- *
- * @param allocationId The allocation id
- */
- void removeAllocation(final AllocationID allocationId) {
- if (allocatedSlotsByAllocationId.containsKey(allocationId)) {
- SlotID slotId = allocatedSlotsByAllocationId.get(allocationId);
- allocatedSlotsByAllocationId.remove(allocationId);
- allocatedSlots.remove(slotId);
- }
- }
-
- /**
- * Check whether allocation exists by slot id
- *
- * @param slotId The slot id
- * @return true if the allocation exists
- */
- boolean isAllocated(final SlotID slotId) {
- return allocatedSlots.containsKey(slotId);
- }
-
- /**
- * Check whether allocation exists by allocation id
- *
- * @param allocationId The allocation id
- * @return true if the allocation exists
- */
- boolean isAllocated(final AllocationID allocationId) {
- return allocatedSlotsByAllocationId.containsKey(allocationId);
- }
-
- AllocationID getAllocationID(final SlotID slotId) {
- return allocatedSlots.get(slotId);
- }
-
- SlotID getSlotID(final AllocationID allocationId) {
- return allocatedSlotsByAllocationId.get(allocationId);
- }
-
- public int size() {
- return allocatedSlots.size();
- }
- }
-
- // ------------------------------------------------------------------------
- // Testing utilities
- // ------------------------------------------------------------------------
-
- @VisibleForTesting
- boolean isAllocated(final SlotID slotId) {
- return allocationMap.isAllocated(slotId);
- }
-
- @VisibleForTesting
- boolean isAllocated(final AllocationID allocationId) {
- return allocationMap.isAllocated(allocationId);
- }
-
- /**
- * Add free slots directly to the free pool, this will not trigger pending requests allocation
- *
- * @param slot The resource slot
- */
- @VisibleForTesting
- void addFreeSlot(final ResourceSlot slot) {
- final ResourceID resourceId = slot.getResourceID();
- final SlotID slotId = slot.getSlotId();
-
- if (!registeredSlots.containsKey(resourceId)) {
- registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
- }
- registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
- freeSlots.put(slotId, slot);
- }
-
- @VisibleForTesting
- int getAllocatedSlotCount() {
- return allocationMap.size();
- }
-
- @VisibleForTesting
- int getFreeSlotCount() {
- return freeSlots.size();
- }
-
- @VisibleForTesting
- int getPendingRequestCount() {
- return pendingSlotRequests.size();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
new file mode 100644
index 0000000..6b7f6dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.io.Serializable;
+
+/**
+ * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
+ */
+public class SlotRequestRegistered extends SlotRequestReply {
+
+ public SlotRequestRegistered(AllocationID allocationID) {
+ super(allocationID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
new file mode 100644
index 0000000..cb3ec72
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.io.Serializable;
+
+/**
+ * Rejection message by the ResourceManager for a SlotRequest from the JobManager
+ */
+public class SlotRequestRejected extends SlotRequestReply {
+
+ public SlotRequestRejected(AllocationID allocationID) {
+ super(allocationID);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
new file mode 100644
index 0000000..1b85d0c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.io.Serializable;
+
+/**
+ * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
+ */
+public abstract class SlotRequestReply implements Serializable {
+
+ private static final long serialVersionUID = 42;
+
+ private final AllocationID allocationID;
+
+ public SlotRequestReply(AllocationID allocationID) {
+ this.allocationID = allocationID;
+ }
+
+ public AllocationID getAllocationID() {
+ return allocationID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
new file mode 100644
index 0000000..ef5ce31
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A simple SlotManager which ignores resource profiles.
+ */
+public class SimpleSlotManager extends SlotManager {
+
+ @Override
+ protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
+ final Iterator<ResourceSlot> slotIterator = freeSlots.values().iterator();
+ if (slotIterator.hasNext()) {
+ return slotIterator.next();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, Map<AllocationID, SlotRequest> pendingRequests) {
+ final Iterator<SlotRequest> requestIterator = pendingRequests.values().iterator();
+ if (requestIterator.hasNext()) {
+ return requestIterator.next();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ protected void allocateContainer(ResourceProfile resourceProfile) {
+ // TODO
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
new file mode 100644
index 0000000..96fde7d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request
+ * slots from registered TaskManagers and issues container allocation requests in case of there are not
+ * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat.
+ * <p>
+ * The main operation principle of SlotManager is:
+ * <ul>
+ * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li>
+ * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li>
+ * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be
+ * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should
+ * be handled outside SlotManager. SlotManager will make each decision based on the information it currently
+ * holds.</li>
+ * </ul>
+ * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
+ */
+public abstract class SlotManager implements LeaderRetrievalListener {
+
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ /** All registered task managers with ResourceID and gateway. */
+ private final Map<ResourceID, TaskExecutorGateway> taskManagerGateways;
+
+ /** All registered slots, including free and allocated slots */
+ private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots;
+
+ /** All pending slot requests, waiting available slots to fulfil */
+ private final Map<AllocationID, SlotRequest> pendingSlotRequests;
+
+ /** All free slots that can be used to be allocated */
+ private final Map<SlotID, ResourceSlot> freeSlots;
+
+ /** All allocations, we can lookup allocations either by SlotID or AllocationID */
+ private final AllocationMap allocationMap;
+
+ private final FiniteDuration timeout;
+
+ /** The current leader id set by the ResourceManager */
+ private UUID leaderID;
+
+ public SlotManager() {
+ this.registeredSlots = new HashMap<>(16);
+ this.pendingSlotRequests = new LinkedHashMap<>(16);
+ this.freeSlots = new HashMap<>(16);
+ this.allocationMap = new AllocationMap();
+ this.taskManagerGateways = new HashMap<>();
+ this.timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+ }
+
+
+ // ------------------------------------------------------------------------
+ // slot managements
+ // ------------------------------------------------------------------------
+
+ /**
+ * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container
+ * allocation if we don't have enough resource. If we have free slot which can match the request, record
+ * this allocation and forward the request to TaskManager through ResourceManager (we want this done by
+ * RPC's main thread to avoid race condition).
+ *
+ * @param request The detailed request of the slot
+ * @return SlotRequestRegistered The confirmation message to be send to the caller
+ */
+ public SlotRequestRegistered requestSlot(final SlotRequest request) {
+ final AllocationID allocationId = request.getAllocationId();
+ if (isRequestDuplicated(request)) {
+ LOG.warn("Duplicated slot request, AllocationID:{}", allocationId);
+ return null;
+ }
+
+ // try to fulfil the request with current free slots
+ final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+ if (slot != null) {
+ LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(),
+ allocationId, request.getJobId());
+
+ // record this allocation in bookkeeping
+ allocationMap.addAllocation(slot.getSlotId(), allocationId);
+
+ // remove selected slot from free pool
+ freeSlots.remove(slot.getSlotId());
+
+ final Future<SlotRequestReply> slotRequestReplyFuture =
+ slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
+ // TODO handle timeouts and response
+ } else {
+ LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
+ "AllocationID:{}, JobID:{}", allocationId, request.getJobId());
+ allocateContainer(request.getResourceProfile());
+ pendingSlotRequests.put(allocationId, request);
+ }
+
+ return new SlotRequestRegistered(allocationId);
+ }
+
+ /**
+ * Sync slot status with TaskManager's SlotReport.
+ */
+ public void updateSlotStatus(final SlotReport slotReport) {
+ for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
+ updateSlotStatus(slotStatus);
+ }
+ }
+
+ /**
+ * Registers a TaskExecutor
+ * @param resourceID TaskExecutor's ResourceID
+ * @param gateway TaskExcutor's gateway
+ */
+ public void registerTaskExecutor(ResourceID resourceID, TaskExecutorGateway gateway) {
+ this.taskManagerGateways.put(resourceID, gateway);
+ }
+
+ /**
+ * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.)
+ * or really rejected by TaskManager. We shall retry this request by:
+ * <ul>
+ * <li>1. verify and clear all the previous allocate information for this request
+ * <li>2. try to request slot again
+ * </ul>
+ * <p>
+ * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response
+ * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request,
+ * but it can be taken care of by rejecting registration at JobManager.
+ *
+ * @param originalRequest The original slot request
+ * @param slotId The target SlotID
+ */
+ public void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) {
+ final AllocationID originalAllocationId = originalRequest.getAllocationId();
+ LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}",
+ slotId, originalAllocationId, originalRequest.getJobId());
+
+ // verify the allocation info before we do anything
+ if (freeSlots.containsKey(slotId)) {
+ // this slot is currently empty, no need to de-allocate it from our allocations
+ LOG.info("Original slot is somehow empty, retrying this request");
+
+ // before retry, we should double check whether this request was allocated by some other ways
+ if (!allocationMap.isAllocated(originalAllocationId)) {
+ requestSlot(originalRequest);
+ } else {
+ LOG.info("The failed request has somehow been allocated, SlotID:{}",
+ allocationMap.getSlotID(originalAllocationId));
+ }
+ } else if (allocationMap.isAllocated(slotId)) {
+ final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
+
+ // check whether we have an agreement on whom this slot belongs to
+ if (originalAllocationId.equals(currentAllocationId)) {
+ LOG.info("De-allocate this request and retry");
+ allocationMap.removeAllocation(currentAllocationId);
+
+ // put this slot back to free pool
+ ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId));
+ freeSlots.put(slotId, slot);
+
+ // retry the request
+ requestSlot(originalRequest);
+ } else {
+ // the slot is taken by someone else, no need to de-allocate it from our allocations
+ LOG.info("Original slot is taken by someone else, current AllocationID:{}", currentAllocationId);
+
+ // before retry, we should double check whether this request was allocated by some other ways
+ if (!allocationMap.isAllocated(originalAllocationId)) {
+ requestSlot(originalRequest);
+ } else {
+ LOG.info("The failed request is somehow been allocated, SlotID:{}",
+ allocationMap.getSlotID(originalAllocationId));
+ }
+ }
+ } else {
+ LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
+ }
+ }
+
+ /**
+ * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean up all its slots.
+ *
+ * @param resourceId The ResourceID of the TaskManager
+ */
+ public void notifyTaskManagerFailure(final ResourceID resourceId) {
+ LOG.info("Resource:{} been notified failure", resourceId);
+ taskManagerGateways.remove(resourceId);
+ final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId);
+ if (slotIdsToRemove != null) {
+ for (SlotID slotId : slotIdsToRemove.keySet()) {
+ LOG.info("Removing Slot: {} upon resource failure", slotId);
+ if (freeSlots.containsKey(slotId)) {
+ freeSlots.remove(slotId);
+ } else if (allocationMap.isAllocated(slotId)) {
+ allocationMap.removeAllocation(slotId);
+ } else {
+ LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
+ }
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // internal behaviors
+ // ------------------------------------------------------------------------
+
+ /**
+ * Update slot status based on TaskManager's report. There are mainly two situations when we receive the report:
+ * <ul>
+ * <li>1. The slot is newly registered.</li>
+ * <li>2. The slot has registered, it contains its current status.</li>
+ * </ul>
+ * <p>
+ * Regarding 1: It's fairly simple, we just record this slot's status, and trigger schedule if slot is empty.
+ * <p>
+ * Regarding 2: It will cause some weird situation since we may have some time-gap on how the slot's status really
+ * is. We may have some updates on the slot's allocation, but it doesn't reflected by TaskManager's heartbeat yet,
+ * and we may make some wrong decision if we cannot guarantee we have the exact status about all the slots. So
+ * the principle here is: We always trust TaskManager's heartbeat, we will correct our information based on that
+ * and take next action based on the diff between our information and heartbeat status.
+ *
+ * @param reportedStatus Reported slot status
+ */
+ void updateSlotStatus(final SlotStatus reportedStatus) {
+ final SlotID slotId = reportedStatus.getSlotID();
+
+ final TaskExecutorGateway taskExecutorGateway = taskManagerGateways.get(slotId.getResourceID());
+ if (taskExecutorGateway == null) {
+ LOG.info("Received SlotStatus but ResourceID {} is unknown to the SlotManager",
+ slotId.getResourceID());
+ return;
+ }
+
+ final ResourceSlot slot = new ResourceSlot(slotId, reportedStatus.getProfiler(), taskExecutorGateway);
+
+ if (registerNewSlot(slot)) {
+ // we have a newly registered slot
+ LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, reportedStatus.getAllocationID());
+
+ if (reportedStatus.getAllocationID() != null) {
+ // slot in use, record this in bookkeeping
+ allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
+ } else {
+ handleFreeSlot(slot);
+ }
+ } else {
+ // slot exists, update current information
+ if (reportedStatus.getAllocationID() != null) {
+ // slot is reported in use
+ final AllocationID reportedAllocationId = reportedStatus.getAllocationID();
+
+ // check whether we also thought this slot is in use
+ if (allocationMap.isAllocated(slotId)) {
+ // we also think that slot is in use, check whether the AllocationID matches
+ final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
+
+ if (!reportedAllocationId.equals(currentAllocationId)) {
+ LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:{}",
+ slotId, currentAllocationId, reportedAllocationId);
+
+ // seems we have a disagreement about the slot assignments, need to correct it
+ allocationMap.removeAllocation(slotId);
+ allocationMap.addAllocation(slotId, reportedAllocationId);
+ }
+ } else {
+ LOG.info("Slot allocation info mismatch! SlotID:{}, current:null, reported:{}",
+ slotId, reportedAllocationId);
+
+ // we thought the slot is free, should correct this information
+ allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
+
+ // remove this slot from free slots pool
+ freeSlots.remove(slotId);
+ }
+ } else {
+ // slot is reported empty
+
+ // check whether we also thought this slot is empty
+ if (allocationMap.isAllocated(slotId)) {
+ LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null",
+ slotId, allocationMap.getAllocationID(slotId));
+
+ // we thought the slot is in use, correct it
+ allocationMap.removeAllocation(slotId);
+
+ // we have a free slot!
+ handleFreeSlot(slot);
+ }
+ }
+ }
+ }
+
+ /**
+ * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled,
+ * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot
+ * to the free pool.
+ *
+ * @param freeSlot The free slot
+ */
+ private void handleFreeSlot(final ResourceSlot freeSlot) {
+ SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests);
+
+ if (chosenRequest != null) {
+ final AllocationID allocationId = chosenRequest.getAllocationId();
+ pendingSlotRequests.remove(allocationId);
+
+ LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(),
+ allocationId, chosenRequest.getJobId());
+ allocationMap.addAllocation(freeSlot.getSlotId(), allocationId);
+
+ final Future<SlotRequestReply> slotRequestReplyFuture =
+ freeSlot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
+ // TODO handle timeouts and response
+ } else {
+ freeSlots.put(freeSlot.getSlotId(), freeSlot);
+ }
+ }
+
+ /**
+ * Check whether the request is duplicated. We use AllocationID to identify slot request, for each
+ * formerly received slot request, it is either in pending list or already been allocated.
+ *
+ * @param request The slot request
+ * @return <tt>true</tt> if the request is duplicated
+ */
+ private boolean isRequestDuplicated(final SlotRequest request) {
+ final AllocationID allocationId = request.getAllocationId();
+ return pendingSlotRequests.containsKey(allocationId)
+ || allocationMap.isAllocated(allocationId);
+ }
+
+ /**
+ * Try to register slot, and tell if this slot is newly registered.
+ *
+ * @param slot The ResourceSlot which will be checked and registered
+ * @return <tt>true</tt> if we meet a new slot
+ */
+ private boolean registerNewSlot(final ResourceSlot slot) {
+ final SlotID slotId = slot.getSlotId();
+ final ResourceID resourceId = slotId.getResourceID();
+ if (!registeredSlots.containsKey(resourceId)) {
+ registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
+ }
+ return registeredSlots.get(resourceId).put(slotId, slot) == null;
+ }
+
+ private ResourceSlot getRegisteredSlot(final SlotID slotId) {
+ final ResourceID resourceId = slotId.getResourceID();
+ if (!registeredSlots.containsKey(resourceId)) {
+ return null;
+ }
+ return registeredSlots.get(resourceId).get(slotId);
+ }
+
+ // ------------------------------------------------------------------------
+ // Framework specific behavior
+ // ------------------------------------------------------------------------
+
+ /**
+ * Choose a slot to use among all free slots, the behavior is framework specified.
+ *
+ * @param request The slot request
+ * @param freeSlots All slots which can be used
+ * @return The slot we choose to use, <tt>null</tt> if we did not find a match
+ */
+ protected abstract ResourceSlot chooseSlotToUse(final SlotRequest request,
+ final Map<SlotID, ResourceSlot> freeSlots);
+
+ /**
+ * Choose a pending request to fulfill when we have a free slot, the behavior is framework specified.
+ *
+ * @param offeredSlot The free slot
+ * @param pendingRequests All the pending slot requests
+ * @return The chosen SlotRequest, <tt>null</tt> if we did not find a match
+ */
+ protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot,
+ final Map<AllocationID, SlotRequest> pendingRequests);
+
+ /**
+ * The framework specific code for allocating a container for specified resource profile.
+ *
+ * @param resourceProfile The resource profile
+ */
+ protected abstract void allocateContainer(final ResourceProfile resourceProfile);
+
+ // ------------------------------------------------------------------------
+ // Helper classes
+ // ------------------------------------------------------------------------
+
+ /**
+ * We maintain all the allocations with SlotID and AllocationID. We are able to get or remove the allocation info
+ * either by SlotID or AllocationID.
+ */
+ private static class AllocationMap {
+
+ /** All allocated slots (by SlotID) */
+ private final Map<SlotID, AllocationID> allocatedSlots;
+
+ /** All allocated slots (by AllocationID), it'a a inverse view of allocatedSlots */
+ private final Map<AllocationID, SlotID> allocatedSlotsByAllocationId;
+
+ AllocationMap() {
+ this.allocatedSlots = new HashMap<>(16);
+ this.allocatedSlotsByAllocationId = new HashMap<>(16);
+ }
+
+ /**
+ * Add a allocation
+ *
+ * @param slotId The slot id
+ * @param allocationId The allocation id
+ */
+ void addAllocation(final SlotID slotId, final AllocationID allocationId) {
+ allocatedSlots.put(slotId, allocationId);
+ allocatedSlotsByAllocationId.put(allocationId, slotId);
+ }
+
+ /**
+ * De-allocation with slot id
+ *
+ * @param slotId The slot id
+ */
+ void removeAllocation(final SlotID slotId) {
+ if (allocatedSlots.containsKey(slotId)) {
+ final AllocationID allocationId = allocatedSlots.get(slotId);
+ allocatedSlots.remove(slotId);
+ allocatedSlotsByAllocationId.remove(allocationId);
+ }
+ }
+
+ /**
+ * De-allocation with allocation id
+ *
+ * @param allocationId The allocation id
+ */
+ void removeAllocation(final AllocationID allocationId) {
+ if (allocatedSlotsByAllocationId.containsKey(allocationId)) {
+ SlotID slotId = allocatedSlotsByAllocationId.get(allocationId);
+ allocatedSlotsByAllocationId.remove(allocationId);
+ allocatedSlots.remove(slotId);
+ }
+ }
+
+ /**
+ * Check whether allocation exists by slot id
+ *
+ * @param slotId The slot id
+ * @return true if the allocation exists
+ */
+ boolean isAllocated(final SlotID slotId) {
+ return allocatedSlots.containsKey(slotId);
+ }
+
+ /**
+ * Check whether allocation exists by allocation id
+ *
+ * @param allocationId The allocation id
+ * @return true if the allocation exists
+ */
+ boolean isAllocated(final AllocationID allocationId) {
+ return allocatedSlotsByAllocationId.containsKey(allocationId);
+ }
+
+ AllocationID getAllocationID(final SlotID slotId) {
+ return allocatedSlots.get(slotId);
+ }
+
+ SlotID getSlotID(final AllocationID allocationId) {
+ return allocatedSlotsByAllocationId.get(allocationId);
+ }
+
+ public int size() {
+ return allocatedSlots.size();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // High availability
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+ this.leaderID = leaderSessionID;
+ }
+
+ @Override
+ public void handleError(Exception exception) {
+ LOG.error("Slot Manager received an error from the leader service", exception);
+ }
+
+ // ------------------------------------------------------------------------
+ // Testing utilities
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ boolean isAllocated(final SlotID slotId) {
+ return allocationMap.isAllocated(slotId);
+ }
+
+ @VisibleForTesting
+ boolean isAllocated(final AllocationID allocationId) {
+ return allocationMap.isAllocated(allocationId);
+ }
+
+ /**
+ * Add free slots directly to the free pool, this will not trigger pending requests allocation
+ *
+ * @param slot The resource slot
+ */
+ @VisibleForTesting
+ void addFreeSlot(final ResourceSlot slot) {
+ final ResourceID resourceId = slot.getResourceID();
+ final SlotID slotId = slot.getSlotId();
+
+ if (!registeredSlots.containsKey(resourceId)) {
+ registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
+ }
+ registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
+ freeSlots.put(slotId, slot);
+ }
+
+ @VisibleForTesting
+ int getAllocatedSlotCount() {
+ return allocationMap.size();
+ }
+
+ @VisibleForTesting
+ int getFreeSlotCount() {
+ return freeSlots.size();
+ }
+
+ @VisibleForTesting
+ int getPendingRequestCount() {
+ return pendingSlotRequests.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
index 744b674..0f57bb1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
@@ -50,7 +50,10 @@ public class SlotStatus implements Serializable {
this(slotID, profiler, null, null);
}
- public SlotStatus(SlotID slotID, ResourceProfile profiler, AllocationID allocationID, JobID jobID) {
+ public SlotStatus(
+ SlotID slotID, ResourceProfile profiler,
+ JobID jobID,
+ AllocationID allocationID) {
this.slotID = checkNotNull(slotID, "slotID cannot be null");
this.profiler = checkNotNull(profiler, "profile cannot be null");
this.allocationID = allocationID;
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 6c99706..7257436 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -18,7 +18,12 @@
package org.apache.flink.runtime.taskexecutor;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
import java.util.UUID;
@@ -32,4 +37,16 @@ public interface TaskExecutorGateway extends RpcGateway {
// ------------------------------------------------------------------------
void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId);
+
+ /**
+ * Send by the ResourceManager to the TaskExecutor
+ * @param allocationID id for the request
+ * @param resourceManagerLeaderID current leader id of the ResourceManager
+ * @return SlotRequestReply Answer to the request
+ */
+
+ Future<SlotRequestReply> requestSlot(
+ AllocationID allocationID,
+ UUID resourceManagerLeaderID,
+ @RpcTimeout FiniteDuration timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53287755/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 5799e62..8183c0a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.MainThreadExecutor;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
@@ -53,7 +54,8 @@ public class ResourceManagerHATest {
TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
- final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices);
+ SlotManager slotManager = mock(SlotManager.class);
+ final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, slotManager);
resourceManager.start();
// before grant leadership, resourceManager's leaderId is null
Assert.assertNull(resourceManager.getLeaderSessionID());