You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/10/13 13:20:12 UTC
[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...
GitHub user tillrohrmann opened a pull request:
https://github.com/apache/flink/pull/4823
[FLINK-7832] [flip6] Extend SlotManager to report free slots per TM
## What is the purpose of the change
Extend the `SlotManager` such that we count the free slots per `TaskManager`. This has the advantage that we don't have to iterate over all registered slots and aggregate their state in order to decide whether a TaskManager is idle or not. Moreover, it allows to easily query how many free slots every `TaskManager` still has.
## Brief change log
- Fail if slot belongs to a unregistered TaskManager
- Add more sanity checks
- Make the TaskManagerSlot state transitions clearer
- Introduce proper TaskManagerSlot state enum and state transitions
- Refactor SlotManager for better maintainability
- Add free slot counting
## Verifying this change
This change is already covered by existing tests, such as `SlotManagerTest`.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tillrohrmann/flink extendSlotManager
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4823.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4823
----
commit 77ece83513d9a79f2f0c3e7024d91243026c334c
Author: Till <ti...@gmail.com>
Date: 2017-10-13T07:19:51Z
[FLINK-7832] [flip6] Extend SlotManager to report free slots per TM
Fail if slot belongs to a unregistered TaskManager
Add more sanity checks
Make the TaskManagerSlot state transitions clearer
Introduce proper TaskManagerSlot state enum
Refactor SlotManager for better maintainability
----
---
[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r145949083
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
@@ -556,14 +537,30 @@ private void registerSlot(
* @return True if the slot could be updated; otherwise false
*/
private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
- TaskManagerSlot slot = slots.get(slotId);
+ final TaskManagerSlot slot = slots.get(slotId);
- if (null != slot) {
- // we assume the given allocation id to be the ground truth (coming from the TM)
- slot.setAllocationId(allocationId);
+ if (slot != null) {
+ final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
+
+ if (taskManagerRegistration != null) {
+ updateSlotInternal(slot, taskManagerRegistration, allocationId);
+
+ return true;
+ } else {
+ throw new IllegalStateException("Trying to update a slot from a TaskManager " +
--- End diff --
Would say this is not necessary.
---
[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r145117077
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
@@ -418,26 +415,17 @@ public void freeSlot(SlotID slotId, AllocationID allocationId) {
TaskManagerSlot slot = slots.get(slotId);
if (null != slot) {
- if (slot.isAllocated()) {
+ if (slot.getState() == TaskManagerSlot.State.ALLOCATED) {
if (Objects.equals(allocationId, slot.getAllocationId())) {
- // free the slot
- slot.setAllocationId(null);
- fulfilledSlotRequests.remove(allocationId);
-
- if (slot.isFree()) {
- handleFreeSlot(slot);
- }
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
- if (null != taskManagerRegistration) {
- if (anySlotUsed(taskManagerRegistration.getSlots())) {
- taskManagerRegistration.markUsed();
- } else {
- taskManagerRegistration.markIdle();
- }
+ if (taskManagerRegistration == null) {
+ throw new IllegalStateException("Trying to free a slot from a TaskManager " +
--- End diff --
Would it be useful to also have the slot ID in the exception?
---
[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r145949796
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
@@ -556,14 +537,30 @@ private void registerSlot(
* @return True if the slot could be updated; otherwise false
*/
private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
- TaskManagerSlot slot = slots.get(slotId);
+ final TaskManagerSlot slot = slots.get(slotId);
- if (null != slot) {
- // we assume the given allocation id to be the ground truth (coming from the TM)
- slot.setAllocationId(allocationId);
+ if (slot != null) {
+ final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
+
+ if (taskManagerRegistration != null) {
+ updateSlotInternal(slot, taskManagerRegistration, allocationId);
+
+ return true;
+ } else {
+ throw new IllegalStateException("Trying to update a slot from a TaskManager " +
+ slot.getInstanceId() + " which has not been registered.");
+ }
+ } else {
+ LOG.debug("Trying to update unknown slot with slot id {}.", slotId);
- if (null != allocationId) {
- if (slot.hasPendingSlotRequest()){
+ return false;
+ }
+ }
+
+ private void updateSlotInternal(TaskManagerSlot slot, TaskManagerRegistration taskManagerRegistration, @Nullable AllocationID allocationId) {
--- End diff --
True. The intention was to have the lookup logic in `updateSlot` because this is not needed for `freeSlot` and `removeSlotRequest`. Will rename `updateSlotInternal` into `updateSlotState`.
---
[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/4823
---
[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r145947839
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---
@@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
return allocationId;
}
- public void setAllocationId(AllocationID allocationId) {
- this.allocationId = allocationId;
- }
-
public PendingSlotRequest getAssignedSlotRequest() {
return assignedSlotRequest;
}
- public void setAssignedSlotRequest(PendingSlotRequest assignedSlotRequest) {
- this.assignedSlotRequest = assignedSlotRequest;
- }
-
public InstanceID getInstanceId() {
return taskManagerConnection.getInstanceID();
}
+ public void freeSlot() {
+ Preconditions.checkState(state == State.ALLOCATED, "Slot must be allocated before freeing it.");
+
+ state = State.FREE;
+ allocationId = null;
+ }
+
+ public void clearPendingSlotRequest() {
+ Preconditions.checkState(state == State.PENDING, "No slot request to clear.");
+
+ state = State.FREE;
+ assignedSlotRequest = null;
+ }
+
+ public void assignPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
+ Preconditions.checkState(state == State.FREE, "Slot must be free to be assigned a slot request.");
+
+ state = State.PENDING;
+ assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest);
+ }
+
+ public void completeAllocation(AllocationID allocationId) {
+ Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated.");
+ Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request.");
+
+ state = State.ALLOCATED;
+ this.allocationId = Preconditions.checkNotNull(allocationId);
--- End diff --
I think `Objects.equals` takes care of this.
---
[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r145950011
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---
@@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
return allocationId;
}
- public void setAllocationId(AllocationID allocationId) {
- this.allocationId = allocationId;
- }
-
public PendingSlotRequest getAssignedSlotRequest() {
return assignedSlotRequest;
}
- public void setAssignedSlotRequest(PendingSlotRequest assignedSlotRequest) {
- this.assignedSlotRequest = assignedSlotRequest;
- }
-
public InstanceID getInstanceId() {
return taskManagerConnection.getInstanceID();
}
+ public void freeSlot() {
+ Preconditions.checkState(state == State.ALLOCATED, "Slot must be allocated before freeing it.");
+
+ state = State.FREE;
+ allocationId = null;
+ }
+
+ public void clearPendingSlotRequest() {
+ Preconditions.checkState(state == State.PENDING, "No slot request to clear.");
+
+ state = State.FREE;
+ assignedSlotRequest = null;
+ }
+
+ public void assignPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
+ Preconditions.checkState(state == State.FREE, "Slot must be free to be assigned a slot request.");
+
+ state = State.PENDING;
+ assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest);
+ }
+
+ public void completeAllocation(AllocationID allocationId) {
+ Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated.");
+ Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request.");
+
+ state = State.ALLOCATED;
+ this.allocationId = Preconditions.checkNotNull(allocationId);
--- End diff --
But will move it to the top to make it clearer.
---
[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r145948351
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---
@@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
return allocationId;
}
- public void setAllocationId(AllocationID allocationId) {
- this.allocationId = allocationId;
- }
-
public PendingSlotRequest getAssignedSlotRequest() {
return assignedSlotRequest;
}
- public void setAssignedSlotRequest(PendingSlotRequest assignedSlotRequest) {
- this.assignedSlotRequest = assignedSlotRequest;
- }
-
public InstanceID getInstanceId() {
return taskManagerConnection.getInstanceID();
}
+ public void freeSlot() {
+ Preconditions.checkState(state == State.ALLOCATED, "Slot must be allocated before freeing it.");
+
+ state = State.FREE;
+ allocationId = null;
+ }
+
+ public void clearPendingSlotRequest() {
+ Preconditions.checkState(state == State.PENDING, "No slot request to clear.");
+
+ state = State.FREE;
+ assignedSlotRequest = null;
+ }
+
+ public void assignPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
+ Preconditions.checkState(state == State.FREE, "Slot must be free to be assigned a slot request.");
+
+ state = State.PENDING;
+ assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest);
+ }
+
+ public void completeAllocation(AllocationID allocationId) {
+ Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated.");
+ Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request.");
+
+ state = State.ALLOCATED;
+ this.allocationId = Preconditions.checkNotNull(allocationId);
+ assignedSlotRequest = null;
+ }
+
+ public void updateAllocation(AllocationID allocationId) {
--- End diff --
`updateAllocation` is used to model the state transition from `FREE` to `ALLOCATED`. This state transition is necessary, because the ground truth is hold by the `TaskExecutor`. Thus, it can be the case that the `ResourceManager` receives a `SlotReport` where it says that slot `x` is allocated with this `allocationId`. This can be the case, for example, in the RM failover case.
---
[GitHub] flink issue #4823: [FLINK-7832] [flip6] Extend SlotManager to report free sl...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/4823
Thanks for your review @zentol. I've addressed your comments and hope to have clarified the `updateAllocation` call for the state transition `FREE` -> `ALLOCATED`.
If there are not other objections, then I would like to merge this PR once Travis gives green light.
---
[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r144810955
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---
@@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
return allocationId;
}
- public void setAllocationId(AllocationID allocationId) {
- this.allocationId = allocationId;
- }
-
public PendingSlotRequest getAssignedSlotRequest() {
return assignedSlotRequest;
}
- public void setAssignedSlotRequest(PendingSlotRequest assignedSlotRequest) {
- this.assignedSlotRequest = assignedSlotRequest;
- }
-
public InstanceID getInstanceId() {
return taskManagerConnection.getInstanceID();
}
+ public void freeSlot() {
+ Preconditions.checkState(state == State.ALLOCATED, "Slot must be allocated before freeing it.");
+
+ state = State.FREE;
+ allocationId = null;
+ }
+
+ public void clearPendingSlotRequest() {
+ Preconditions.checkState(state == State.PENDING, "No slot request to clear.");
+
+ state = State.FREE;
+ assignedSlotRequest = null;
+ }
+
+ public void assignPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
+ Preconditions.checkState(state == State.FREE, "Slot must be free to be assigned a slot request.");
+
+ state = State.PENDING;
+ assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest);
+ }
+
+ public void completeAllocation(AllocationID allocationId) {
+ Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated.");
+ Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request.");
+
+ state = State.ALLOCATED;
+ this.allocationId = Preconditions.checkNotNull(allocationId);
--- End diff --
The null check should be done before comparing the id to the requests allocation id.
---
[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r145119294
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
@@ -556,14 +537,30 @@ private void registerSlot(
* @return True if the slot could be updated; otherwise false
*/
private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
- TaskManagerSlot slot = slots.get(slotId);
+ final TaskManagerSlot slot = slots.get(slotId);
- if (null != slot) {
- // we assume the given allocation id to be the ground truth (coming from the TM)
- slot.setAllocationId(allocationId);
+ if (slot != null) {
+ final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
+
+ if (taskManagerRegistration != null) {
+ updateSlotInternal(slot, taskManagerRegistration, allocationId);
+
+ return true;
+ } else {
+ throw new IllegalStateException("Trying to update a slot from a TaskManager " +
+ slot.getInstanceId() + " which has not been registered.");
+ }
+ } else {
+ LOG.debug("Trying to update unknown slot with slot id {}.", slotId);
- if (null != allocationId) {
- if (slot.hasPendingSlotRequest()){
+ return false;
+ }
+ }
+
+ private void updateSlotInternal(TaskManagerSlot slot, TaskManagerRegistration taskManagerRegistration, @Nullable AllocationID allocationId) {
--- End diff --
isn't it odd to have an `updateSlotInternal` method, when a private `updateSlot` method already exists? (more or less a naming issue)
---
[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r145117281
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
@@ -556,14 +537,30 @@ private void registerSlot(
* @return True if the slot could be updated; otherwise false
*/
private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
- TaskManagerSlot slot = slots.get(slotId);
+ final TaskManagerSlot slot = slots.get(slotId);
- if (null != slot) {
- // we assume the given allocation id to be the ground truth (coming from the TM)
- slot.setAllocationId(allocationId);
+ if (slot != null) {
+ final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
+
+ if (taskManagerRegistration != null) {
+ updateSlotInternal(slot, taskManagerRegistration, allocationId);
+
+ return true;
+ } else {
+ throw new IllegalStateException("Trying to update a slot from a TaskManager " +
--- End diff --
same as above
---
[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r145948519
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
@@ -418,26 +415,17 @@ public void freeSlot(SlotID slotId, AllocationID allocationId) {
TaskManagerSlot slot = slots.get(slotId);
if (null != slot) {
- if (slot.isAllocated()) {
+ if (slot.getState() == TaskManagerSlot.State.ALLOCATED) {
if (Objects.equals(allocationId, slot.getAllocationId())) {
- // free the slot
- slot.setAllocationId(null);
- fulfilledSlotRequests.remove(allocationId);
-
- if (slot.isFree()) {
- handleFreeSlot(slot);
- }
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
- if (null != taskManagerRegistration) {
- if (anySlotUsed(taskManagerRegistration.getSlots())) {
- taskManagerRegistration.markUsed();
- } else {
- taskManagerRegistration.markIdle();
- }
+ if (taskManagerRegistration == null) {
+ throw new IllegalStateException("Trying to free a slot from a TaskManager " +
--- End diff --
Yes, will add it.
---
[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r145958985
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---
@@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
return allocationId;
}
- public void setAllocationId(AllocationID allocationId) {
- this.allocationId = allocationId;
- }
-
public PendingSlotRequest getAssignedSlotRequest() {
return assignedSlotRequest;
}
- public void setAssignedSlotRequest(PendingSlotRequest assignedSlotRequest) {
- this.assignedSlotRequest = assignedSlotRequest;
- }
-
public InstanceID getInstanceId() {
return taskManagerConnection.getInstanceID();
}
+ public void freeSlot() {
+ Preconditions.checkState(state == State.ALLOCATED, "Slot must be allocated before freeing it.");
+
+ state = State.FREE;
+ allocationId = null;
+ }
+
+ public void clearPendingSlotRequest() {
+ Preconditions.checkState(state == State.PENDING, "No slot request to clear.");
+
+ state = State.FREE;
+ assignedSlotRequest = null;
+ }
+
+ public void assignPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
+ Preconditions.checkState(state == State.FREE, "Slot must be free to be assigned a slot request.");
+
+ state = State.PENDING;
+ assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest);
+ }
+
+ public void completeAllocation(AllocationID allocationId) {
+ Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated.");
+ Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request.");
+
+ state = State.ALLOCATED;
+ this.allocationId = Preconditions.checkNotNull(allocationId);
--- End diff --
this wasn't about correctness but clarity, so it's good that you changed it :)
---
[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4823#discussion_r145129240
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---
@@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
return allocationId;
}
- public void setAllocationId(AllocationID allocationId) {
- this.allocationId = allocationId;
- }
-
public PendingSlotRequest getAssignedSlotRequest() {
return assignedSlotRequest;
}
- public void setAssignedSlotRequest(PendingSlotRequest assignedSlotRequest) {
- this.assignedSlotRequest = assignedSlotRequest;
- }
-
public InstanceID getInstanceId() {
return taskManagerConnection.getInstanceID();
}
+ public void freeSlot() {
+ Preconditions.checkState(state == State.ALLOCATED, "Slot must be allocated before freeing it.");
+
+ state = State.FREE;
+ allocationId = null;
+ }
+
+ public void clearPendingSlotRequest() {
+ Preconditions.checkState(state == State.PENDING, "No slot request to clear.");
+
+ state = State.FREE;
+ assignedSlotRequest = null;
+ }
+
+ public void assignPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
+ Preconditions.checkState(state == State.FREE, "Slot must be free to be assigned a slot request.");
+
+ state = State.PENDING;
+ assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest);
+ }
+
+ public void completeAllocation(AllocationID allocationId) {
+ Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated.");
+ Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request.");
+
+ state = State.ALLOCATED;
+ this.allocationId = Preconditions.checkNotNull(allocationId);
+ assignedSlotRequest = null;
+ }
+
+ public void updateAllocation(AllocationID allocationId) {
--- End diff --
Can you explain the difference between updateAllocation and completeAllocation in terms of when they are used? (I would've expected that the slot lifecycle is along the lines of FREE <-> PENDING -> ALLOCATED -> FREE), but this doesn't appear to be the case as a free slot can be allocated without ever being in a pending state).
---