You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/10/13 13:20:12 UTC

[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4823

    [FLINK-7832] [flip6] Extend SlotManager to report free slots per TM

    ## What is the purpose of the change
    
    Extend the `SlotManager` such that we count the free slots per `TaskManager`. This has the advantage that we don't have to iterate over all registered slots and aggregate their state in order to decide whether a TaskManager is idle or not. Moreover, it allows to easily query how many free slots every `TaskManager` still has.
    
    ## Brief change log
    
    - Fail if slot belongs to a unregistered TaskManager
    - Add more sanity checks
    - Make the TaskManagerSlot state transitions clearer
    - Introduce proper TaskManagerSlot state enum and state transitions
    - Refactor SlotManager for better maintainability
    - Add free slot counting
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as `SlotManagerTest`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink extendSlotManager

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4823.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4823
    
----
commit 77ece83513d9a79f2f0c3e7024d91243026c334c
Author: Till <ti...@gmail.com>
Date:   2017-10-13T07:19:51Z

    [FLINK-7832] [flip6] Extend SlotManager to report free slots per TM
    
    Fail if slot belongs to a unregistered TaskManager
    
    Add more sanity checks
    
    Make the TaskManagerSlot state transitions clearer
    
    Introduce proper TaskManagerSlot state enum
    
    Refactor SlotManager for better maintainability

----


---

[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4823#discussion_r145949083
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
    @@ -556,14 +537,30 @@ private void registerSlot(
     	 * @return True if the slot could be updated; otherwise false
     	 */
     	private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
    -		TaskManagerSlot slot = slots.get(slotId);
    +		final TaskManagerSlot slot = slots.get(slotId);
     
    -		if (null != slot) {
    -			// we assume the given allocation id to be the ground truth (coming from the TM)
    -			slot.setAllocationId(allocationId);
    +		if (slot != null) {
    +			final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
    +
    +			if (taskManagerRegistration != null) {
    +				updateSlotInternal(slot, taskManagerRegistration, allocationId);
    +
    +				return true;
    +			} else {
    +				throw new IllegalStateException("Trying to update a slot from a TaskManager " +
    --- End diff --
    
    Would say this is not necessary.


---

[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4823#discussion_r145117077
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
    @@ -418,26 +415,17 @@ public void freeSlot(SlotID slotId, AllocationID allocationId) {
     		TaskManagerSlot slot = slots.get(slotId);
     
     		if (null != slot) {
    -			if (slot.isAllocated()) {
    +			if (slot.getState() == TaskManagerSlot.State.ALLOCATED) {
     				if (Objects.equals(allocationId, slot.getAllocationId())) {
    -					// free the slot
    -					slot.setAllocationId(null);
    -					fulfilledSlotRequests.remove(allocationId);
    -
    -					if (slot.isFree()) {
    -						handleFreeSlot(slot);
    -					}
     
     					TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
     
    -					if (null != taskManagerRegistration) {
    -						if (anySlotUsed(taskManagerRegistration.getSlots())) {
    -							taskManagerRegistration.markUsed();
    -						} else {
    -							taskManagerRegistration.markIdle();
    -						}
    +					if (taskManagerRegistration == null) {
    +						throw new IllegalStateException("Trying to free a slot from a TaskManager " +
    --- End diff --
    
    Would it be useful to also have the slot ID in the exception?


---

[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4823#discussion_r145949796
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
    @@ -556,14 +537,30 @@ private void registerSlot(
     	 * @return True if the slot could be updated; otherwise false
     	 */
     	private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
    -		TaskManagerSlot slot = slots.get(slotId);
    +		final TaskManagerSlot slot = slots.get(slotId);
     
    -		if (null != slot) {
    -			// we assume the given allocation id to be the ground truth (coming from the TM)
    -			slot.setAllocationId(allocationId);
    +		if (slot != null) {
    +			final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
    +
    +			if (taskManagerRegistration != null) {
    +				updateSlotInternal(slot, taskManagerRegistration, allocationId);
    +
    +				return true;
    +			} else {
    +				throw new IllegalStateException("Trying to update a slot from a TaskManager " +
    +					slot.getInstanceId() + " which has not been registered.");
    +			}
    +		} else {
    +			LOG.debug("Trying to update unknown slot with slot id {}.", slotId);
     
    -			if (null != allocationId) {
    -				if (slot.hasPendingSlotRequest()){
    +			return false;
    +		}
    +	}
    +
    +	private void updateSlotInternal(TaskManagerSlot slot, TaskManagerRegistration taskManagerRegistration, @Nullable AllocationID allocationId) {
    --- End diff --
    
    True. The intention was to have the lookup logic in `updateSlot` because this is not needed for `freeSlot` and `removeSlotRequest`. Will rename `updateSlotInternal` into `updateSlotState`.


---

[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4823


---

[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4823#discussion_r145947839
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---
    @@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
     		return allocationId;
     	}
     
    -	public void setAllocationId(AllocationID allocationId) {
    -		this.allocationId = allocationId;
    -	}
    -
     	public PendingSlotRequest getAssignedSlotRequest() {
     		return assignedSlotRequest;
     	}
     
    -	public void setAssignedSlotRequest(PendingSlotRequest assignedSlotRequest) {
    -		this.assignedSlotRequest = assignedSlotRequest;
    -	}
    -
     	public InstanceID getInstanceId() {
     		return taskManagerConnection.getInstanceID();
     	}
     
    +	public void freeSlot() {
    +		Preconditions.checkState(state == State.ALLOCATED, "Slot must be allocated before freeing it.");
    +
    +		state = State.FREE;
    +		allocationId = null;
    +	}
    +
    +	public void clearPendingSlotRequest() {
    +		Preconditions.checkState(state == State.PENDING, "No slot request to clear.");
    +
    +		state = State.FREE;
    +		assignedSlotRequest = null;
    +	}
    +
    +	public void assignPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
    +		Preconditions.checkState(state == State.FREE, "Slot must be free to be assigned a slot request.");
    +
    +		state = State.PENDING;
    +		assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest);
    +	}
    +
    +	public void completeAllocation(AllocationID allocationId) {
    +		Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated.");
    +		Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request.");
    +
    +		state = State.ALLOCATED;
    +		this.allocationId = Preconditions.checkNotNull(allocationId);
    --- End diff --
    
    I think `Objects.equals` takes care of this.


---

[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4823#discussion_r145950011
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---
    @@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
     		return allocationId;
     	}
     
    -	public void setAllocationId(AllocationID allocationId) {
    -		this.allocationId = allocationId;
    -	}
    -
     	public PendingSlotRequest getAssignedSlotRequest() {
     		return assignedSlotRequest;
     	}
     
    -	public void setAssignedSlotRequest(PendingSlotRequest assignedSlotRequest) {
    -		this.assignedSlotRequest = assignedSlotRequest;
    -	}
    -
     	public InstanceID getInstanceId() {
     		return taskManagerConnection.getInstanceID();
     	}
     
    +	public void freeSlot() {
    +		Preconditions.checkState(state == State.ALLOCATED, "Slot must be allocated before freeing it.");
    +
    +		state = State.FREE;
    +		allocationId = null;
    +	}
    +
    +	public void clearPendingSlotRequest() {
    +		Preconditions.checkState(state == State.PENDING, "No slot request to clear.");
    +
    +		state = State.FREE;
    +		assignedSlotRequest = null;
    +	}
    +
    +	public void assignPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
    +		Preconditions.checkState(state == State.FREE, "Slot must be free to be assigned a slot request.");
    +
    +		state = State.PENDING;
    +		assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest);
    +	}
    +
    +	public void completeAllocation(AllocationID allocationId) {
    +		Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated.");
    +		Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request.");
    +
    +		state = State.ALLOCATED;
    +		this.allocationId = Preconditions.checkNotNull(allocationId);
    --- End diff --
    
    But will move it to the top to make it clearer.


---

[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4823#discussion_r145948351
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---
    @@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
     		return allocationId;
     	}
     
    -	public void setAllocationId(AllocationID allocationId) {
    -		this.allocationId = allocationId;
    -	}
    -
     	public PendingSlotRequest getAssignedSlotRequest() {
     		return assignedSlotRequest;
     	}
     
    -	public void setAssignedSlotRequest(PendingSlotRequest assignedSlotRequest) {
    -		this.assignedSlotRequest = assignedSlotRequest;
    -	}
    -
     	public InstanceID getInstanceId() {
     		return taskManagerConnection.getInstanceID();
     	}
     
    +	public void freeSlot() {
    +		Preconditions.checkState(state == State.ALLOCATED, "Slot must be allocated before freeing it.");
    +
    +		state = State.FREE;
    +		allocationId = null;
    +	}
    +
    +	public void clearPendingSlotRequest() {
    +		Preconditions.checkState(state == State.PENDING, "No slot request to clear.");
    +
    +		state = State.FREE;
    +		assignedSlotRequest = null;
    +	}
    +
    +	public void assignPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
    +		Preconditions.checkState(state == State.FREE, "Slot must be free to be assigned a slot request.");
    +
    +		state = State.PENDING;
    +		assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest);
    +	}
    +
    +	public void completeAllocation(AllocationID allocationId) {
    +		Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated.");
    +		Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request.");
    +
    +		state = State.ALLOCATED;
    +		this.allocationId = Preconditions.checkNotNull(allocationId);
    +		assignedSlotRequest = null;
    +	}
    +
    +	public void updateAllocation(AllocationID allocationId) {
    --- End diff --
    
    `updateAllocation` is used to model the state transition from `FREE` to `ALLOCATED`. This state transition is necessary, because the ground truth is hold by the `TaskExecutor`. Thus, it can be the case that the `ResourceManager` receives a `SlotReport` where it says that slot `x` is allocated with this `allocationId`. This can be the case, for example, in the RM failover case.


---

[GitHub] flink issue #4823: [FLINK-7832] [flip6] Extend SlotManager to report free sl...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4823
  
    Thanks for your review @zentol. I've addressed your comments and hope to have clarified the `updateAllocation` call for the state transition `FREE` -> `ALLOCATED`. 
    
    If there are not other objections, then I would like to merge this PR once Travis gives green light.


---

[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4823#discussion_r144810955
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---
    @@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
     		return allocationId;
     	}
     
    -	public void setAllocationId(AllocationID allocationId) {
    -		this.allocationId = allocationId;
    -	}
    -
     	public PendingSlotRequest getAssignedSlotRequest() {
     		return assignedSlotRequest;
     	}
     
    -	public void setAssignedSlotRequest(PendingSlotRequest assignedSlotRequest) {
    -		this.assignedSlotRequest = assignedSlotRequest;
    -	}
    -
     	public InstanceID getInstanceId() {
     		return taskManagerConnection.getInstanceID();
     	}
     
    +	public void freeSlot() {
    +		Preconditions.checkState(state == State.ALLOCATED, "Slot must be allocated before freeing it.");
    +
    +		state = State.FREE;
    +		allocationId = null;
    +	}
    +
    +	public void clearPendingSlotRequest() {
    +		Preconditions.checkState(state == State.PENDING, "No slot request to clear.");
    +
    +		state = State.FREE;
    +		assignedSlotRequest = null;
    +	}
    +
    +	public void assignPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
    +		Preconditions.checkState(state == State.FREE, "Slot must be free to be assigned a slot request.");
    +
    +		state = State.PENDING;
    +		assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest);
    +	}
    +
    +	public void completeAllocation(AllocationID allocationId) {
    +		Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated.");
    +		Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request.");
    +
    +		state = State.ALLOCATED;
    +		this.allocationId = Preconditions.checkNotNull(allocationId);
    --- End diff --
    
    The null check should be done before comparing the id to the requests allocation id.


---

[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4823#discussion_r145119294
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
    @@ -556,14 +537,30 @@ private void registerSlot(
     	 * @return True if the slot could be updated; otherwise false
     	 */
     	private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
    -		TaskManagerSlot slot = slots.get(slotId);
    +		final TaskManagerSlot slot = slots.get(slotId);
     
    -		if (null != slot) {
    -			// we assume the given allocation id to be the ground truth (coming from the TM)
    -			slot.setAllocationId(allocationId);
    +		if (slot != null) {
    +			final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
    +
    +			if (taskManagerRegistration != null) {
    +				updateSlotInternal(slot, taskManagerRegistration, allocationId);
    +
    +				return true;
    +			} else {
    +				throw new IllegalStateException("Trying to update a slot from a TaskManager " +
    +					slot.getInstanceId() + " which has not been registered.");
    +			}
    +		} else {
    +			LOG.debug("Trying to update unknown slot with slot id {}.", slotId);
     
    -			if (null != allocationId) {
    -				if (slot.hasPendingSlotRequest()){
    +			return false;
    +		}
    +	}
    +
    +	private void updateSlotInternal(TaskManagerSlot slot, TaskManagerRegistration taskManagerRegistration, @Nullable AllocationID allocationId) {
    --- End diff --
    
    isn't it odd to have an `updateSlotInternal` method, when a private `updateSlot` method already exists? (more or less a naming issue)


---

[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4823#discussion_r145117281
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
    @@ -556,14 +537,30 @@ private void registerSlot(
     	 * @return True if the slot could be updated; otherwise false
     	 */
     	private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
    -		TaskManagerSlot slot = slots.get(slotId);
    +		final TaskManagerSlot slot = slots.get(slotId);
     
    -		if (null != slot) {
    -			// we assume the given allocation id to be the ground truth (coming from the TM)
    -			slot.setAllocationId(allocationId);
    +		if (slot != null) {
    +			final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
    +
    +			if (taskManagerRegistration != null) {
    +				updateSlotInternal(slot, taskManagerRegistration, allocationId);
    +
    +				return true;
    +			} else {
    +				throw new IllegalStateException("Trying to update a slot from a TaskManager " +
    --- End diff --
    
    same as above


---

[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4823#discussion_r145948519
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
    @@ -418,26 +415,17 @@ public void freeSlot(SlotID slotId, AllocationID allocationId) {
     		TaskManagerSlot slot = slots.get(slotId);
     
     		if (null != slot) {
    -			if (slot.isAllocated()) {
    +			if (slot.getState() == TaskManagerSlot.State.ALLOCATED) {
     				if (Objects.equals(allocationId, slot.getAllocationId())) {
    -					// free the slot
    -					slot.setAllocationId(null);
    -					fulfilledSlotRequests.remove(allocationId);
    -
    -					if (slot.isFree()) {
    -						handleFreeSlot(slot);
    -					}
     
     					TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
     
    -					if (null != taskManagerRegistration) {
    -						if (anySlotUsed(taskManagerRegistration.getSlots())) {
    -							taskManagerRegistration.markUsed();
    -						} else {
    -							taskManagerRegistration.markIdle();
    -						}
    +					if (taskManagerRegistration == null) {
    +						throw new IllegalStateException("Trying to free a slot from a TaskManager " +
    --- End diff --
    
    Yes, will add it.


---

[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4823#discussion_r145958985
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---
    @@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
     		return allocationId;
     	}
     
    -	public void setAllocationId(AllocationID allocationId) {
    -		this.allocationId = allocationId;
    -	}
    -
     	public PendingSlotRequest getAssignedSlotRequest() {
     		return assignedSlotRequest;
     	}
     
    -	public void setAssignedSlotRequest(PendingSlotRequest assignedSlotRequest) {
    -		this.assignedSlotRequest = assignedSlotRequest;
    -	}
    -
     	public InstanceID getInstanceId() {
     		return taskManagerConnection.getInstanceID();
     	}
     
    +	public void freeSlot() {
    +		Preconditions.checkState(state == State.ALLOCATED, "Slot must be allocated before freeing it.");
    +
    +		state = State.FREE;
    +		allocationId = null;
    +	}
    +
    +	public void clearPendingSlotRequest() {
    +		Preconditions.checkState(state == State.PENDING, "No slot request to clear.");
    +
    +		state = State.FREE;
    +		assignedSlotRequest = null;
    +	}
    +
    +	public void assignPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
    +		Preconditions.checkState(state == State.FREE, "Slot must be free to be assigned a slot request.");
    +
    +		state = State.PENDING;
    +		assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest);
    +	}
    +
    +	public void completeAllocation(AllocationID allocationId) {
    +		Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated.");
    +		Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request.");
    +
    +		state = State.ALLOCATED;
    +		this.allocationId = Preconditions.checkNotNull(allocationId);
    --- End diff --
    
    this wasn't about correctness but clarity, so it's good that you changed it :)


---

[GitHub] flink pull request #4823: [FLINK-7832] [flip6] Extend SlotManager to report ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4823#discussion_r145129240
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---
    @@ -74,22 +83,51 @@ public AllocationID getAllocationId() {
     		return allocationId;
     	}
     
    -	public void setAllocationId(AllocationID allocationId) {
    -		this.allocationId = allocationId;
    -	}
    -
     	public PendingSlotRequest getAssignedSlotRequest() {
     		return assignedSlotRequest;
     	}
     
    -	public void setAssignedSlotRequest(PendingSlotRequest assignedSlotRequest) {
    -		this.assignedSlotRequest = assignedSlotRequest;
    -	}
    -
     	public InstanceID getInstanceId() {
     		return taskManagerConnection.getInstanceID();
     	}
     
    +	public void freeSlot() {
    +		Preconditions.checkState(state == State.ALLOCATED, "Slot must be allocated before freeing it.");
    +
    +		state = State.FREE;
    +		allocationId = null;
    +	}
    +
    +	public void clearPendingSlotRequest() {
    +		Preconditions.checkState(state == State.PENDING, "No slot request to clear.");
    +
    +		state = State.FREE;
    +		assignedSlotRequest = null;
    +	}
    +
    +	public void assignPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
    +		Preconditions.checkState(state == State.FREE, "Slot must be free to be assigned a slot request.");
    +
    +		state = State.PENDING;
    +		assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest);
    +	}
    +
    +	public void completeAllocation(AllocationID allocationId) {
    +		Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated.");
    +		Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request.");
    +
    +		state = State.ALLOCATED;
    +		this.allocationId = Preconditions.checkNotNull(allocationId);
    +		assignedSlotRequest = null;
    +	}
    +
    +	public void updateAllocation(AllocationID allocationId) {
    --- End diff --
    
    Can you explain the difference between updateAllocation and completeAllocation in terms of when they are used? (I would've expected that the slot lifecycle is along the lines of FREE <-> PENDING -> ALLOCATED -> FREE), but this doesn't appear to be the case as a free slot can be allocated without ever being in a pending state).


---