You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2018/05/09 16:30:24 UTC

[GitHub] flink pull request #5980: [FLINK-9324] Wait for slot release before completi...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/5980

    [FLINK-9324] Wait for slot release before completing release future in SingleLogicalSlot

    ## What is the purpose of the change
    
    This commit properly waits for the completion of the SingleLogicalSlot's release future
    until the SlotOwner has acknowledged the release. That way the ExecutionGraph will only
    recover after all of its slots have been returned to the SlotPool.
    
    As a side effect, the changes in this commit should reduce the number of redundant release
    calls sent to the SlotOwner which cluttered the debug logs.
    
    ## Brief change log
    
    - Simplify `AllocatedSlot#Payload` interface
    - Don't require calls coming from the SlotPool to wait for the completion of the payload terminal state future before releasing the slot --> The idea is that the SlotPool knows when the slots on the TM are emptied. Therefore, we only need to fail the payload.
    - Properly wait for the `SlotOwner` to acknowledge the release of the slot
    
    ## Verifying this change
    
    - Added `SingleLogicalSlotTest`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink waitForSlotRelease

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5980.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5980
    
----
commit 978e7ec2ca4d53f123c66ca01b65f24f905969c0
Author: Till Rohrmann <tr...@...>
Date:   2018-05-09T13:29:36Z

    [FLINK-9324] Wait for slot release before completing release future in SingleLogicalSlot
    
    This commit properly waits for the completion of the SingleLogicalSlot's release future
    until the SlotOwner has acknowledged the release. That way the ExecutionGraph will only
    recover after all of its slots have been returned to the SlotPool.
    
    As a side effect, the changes in this commit should reduce the number of redundant release
    calls sent to the SlotOwner which cluttered the debug logs.

----


---

[GitHub] flink pull request #5980: [FLINK-9324] Wait for slot release before completi...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5980#discussion_r187189887
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java ---
    @@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() {
     	 * the logical slot.
     	 *
     	 * @param cause of the payload release
    -	 * @return true if the logical slot's payload could be released, otherwise false
     	 */
     	@Override
    -	public boolean release(Throwable cause) {
    -		return releaseSlot(cause).isDone();
    +	public void release(Throwable cause) {
    +		if (STATE_UPDATER.compareAndSet(this, State.ALIVE, State.RELEASING)) {
    +			signalPayloadRelease(cause);
    +		}
    +		state = State.RELEASED;
    +		releaseFuture.complete(null);
    +	}
    +
    +	private CompletableFuture<?> signalPayloadRelease(Throwable cause) {
    +		tryAssignPayload(TERMINATED_PAYLOAD);
    +		payload.fail(cause);
    +
    +		return payload.getTerminalStateFuture();
    +	}
    +
    +	private void returnSlotToOwner(CompletableFuture<?> terminalStateFuture) {
    +		final CompletableFuture<Boolean> slotReturnFuture = terminalStateFuture.handle((Object ignored, Throwable throwable) -> {
    +			if (state == State.RELEASING) {
    +				return slotOwner.returnAllocatedSlot(this);
    +			} else {
    +				return CompletableFuture.completedFuture(true);
    +			}
    +		}).thenCompose(Function.identity());
    +
    +		slotReturnFuture.whenComplete(
    +			(Object ignored, Throwable throwable) -> {
    +				state = State.RELEASED;
    --- End diff --
    
    Mutating a private member field from within a lambda - would it make sense to have a (package) private method `markReleased()` or so?


---

[GitHub] flink issue #5980: [FLINK-9324] Wait for slot release before completing rele...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/5980
  
    Thanks for the review @StephanEwen. I've addressed your comments and added some more tests. Once Travis passes, I'll merge it.


---

[GitHub] flink pull request #5980: [FLINK-9324] Wait for slot release before completi...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5980#discussion_r187191338
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java ---
    @@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() {
     	 * the logical slot.
     	 *
     	 * @param cause of the payload release
    -	 * @return true if the logical slot's payload could be released, otherwise false
     	 */
     	@Override
    -	public boolean release(Throwable cause) {
    -		return releaseSlot(cause).isDone();
    +	public void release(Throwable cause) {
    +		if (STATE_UPDATER.compareAndSet(this, State.ALIVE, State.RELEASING)) {
    +			signalPayloadRelease(cause);
    +		}
    +		state = State.RELEASED;
    +		releaseFuture.complete(null);
    +	}
    +
    +	private CompletableFuture<?> signalPayloadRelease(Throwable cause) {
    +		tryAssignPayload(TERMINATED_PAYLOAD);
    +		payload.fail(cause);
    +
    +		return payload.getTerminalStateFuture();
    +	}
    +
    +	private void returnSlotToOwner(CompletableFuture<?> terminalStateFuture) {
    +		final CompletableFuture<Boolean> slotReturnFuture = terminalStateFuture.handle((Object ignored, Throwable throwable) -> {
    +			if (state == State.RELEASING) {
    --- End diff --
    
    What happens if this gets set concurrently to `RELEASED`? This would only work if `slotOwner.returnAllocatedSlot(this)` works in both cases (releasing and released) and the second path (returning the completed future) is the optimization/fast path if it is already released. (double checking the assumption).


---

[GitHub] flink pull request #5980: [FLINK-9324] Wait for slot release before completi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5980#discussion_r187305983
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java ---
    @@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() {
     	 * the logical slot.
     	 *
     	 * @param cause of the payload release
    -	 * @return true if the logical slot's payload could be released, otherwise false
     	 */
     	@Override
    -	public boolean release(Throwable cause) {
    -		return releaseSlot(cause).isDone();
    +	public void release(Throwable cause) {
    +		if (STATE_UPDATER.compareAndSet(this, State.ALIVE, State.RELEASING)) {
    +			signalPayloadRelease(cause);
    +		}
    +		state = State.RELEASED;
    +		releaseFuture.complete(null);
    +	}
    +
    +	private CompletableFuture<?> signalPayloadRelease(Throwable cause) {
    +		tryAssignPayload(TERMINATED_PAYLOAD);
    +		payload.fail(cause);
    +
    +		return payload.getTerminalStateFuture();
    +	}
    +
    +	private void returnSlotToOwner(CompletableFuture<?> terminalStateFuture) {
    +		final CompletableFuture<Boolean> slotReturnFuture = terminalStateFuture.handle((Object ignored, Throwable throwable) -> {
    +			if (state == State.RELEASING) {
    +				return slotOwner.returnAllocatedSlot(this);
    +			} else {
    +				return CompletableFuture.completedFuture(true);
    +			}
    +		}).thenCompose(Function.identity());
    +
    +		slotReturnFuture.whenComplete(
    +			(Object ignored, Throwable throwable) -> {
    +				state = State.RELEASED;
    --- End diff --
    
    yes, this is cleaner


---

[GitHub] flink pull request #5980: [FLINK-9324] Wait for slot release before completi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5980#discussion_r187307945
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java ---
    @@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() {
     	 * the logical slot.
     	 *
     	 * @param cause of the payload release
    -	 * @return true if the logical slot's payload could be released, otherwise false
     	 */
     	@Override
    -	public boolean release(Throwable cause) {
    -		return releaseSlot(cause).isDone();
    +	public void release(Throwable cause) {
    +		if (STATE_UPDATER.compareAndSet(this, State.ALIVE, State.RELEASING)) {
    +			signalPayloadRelease(cause);
    +		}
    +		state = State.RELEASED;
    +		releaseFuture.complete(null);
    +	}
    +
    +	private CompletableFuture<?> signalPayloadRelease(Throwable cause) {
    +		tryAssignPayload(TERMINATED_PAYLOAD);
    +		payload.fail(cause);
    +
    +		return payload.getTerminalStateFuture();
    +	}
    +
    +	private void returnSlotToOwner(CompletableFuture<?> terminalStateFuture) {
    +		final CompletableFuture<Boolean> slotReturnFuture = terminalStateFuture.handle((Object ignored, Throwable throwable) -> {
    +			if (state == State.RELEASING) {
    --- End diff --
    
    After thinking about this part, I think also a double check won't strictly prevent this from happening. The problem is that we don't know when the `SlotOwner` processes the return slot message. What could happen is the following: We send the release message and see afterwards that the state is still `RELEASING`. Before the `SlotOwner` processes the release slot message, it will trigger the `AllocatedSlot.Payload#release` call which releases the slot from the `SlotOwners` side. After this has happened, the owner processes the release message. Consequently, the slot owner has to work in both cases (`RELEASING` and `RELEASED`). With the double check we only make it less likely to happen.


---

[GitHub] flink pull request #5980: [FLINK-9324] Wait for slot release before completi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5980


---

[GitHub] flink pull request #5980: [FLINK-9324] Wait for slot release before completi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5980#discussion_r187306715
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java ---
    @@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() {
     	 * the logical slot.
     	 *
     	 * @param cause of the payload release
    -	 * @return true if the logical slot's payload could be released, otherwise false
     	 */
     	@Override
    -	public boolean release(Throwable cause) {
    -		return releaseSlot(cause).isDone();
    +	public void release(Throwable cause) {
    +		if (STATE_UPDATER.compareAndSet(this, State.ALIVE, State.RELEASING)) {
    +			signalPayloadRelease(cause);
    +		}
    +		state = State.RELEASED;
    +		releaseFuture.complete(null);
    +	}
    +
    +	private CompletableFuture<?> signalPayloadRelease(Throwable cause) {
    +		tryAssignPayload(TERMINATED_PAYLOAD);
    +		payload.fail(cause);
    +
    +		return payload.getTerminalStateFuture();
    +	}
    +
    +	private void returnSlotToOwner(CompletableFuture<?> terminalStateFuture) {
    +		final CompletableFuture<Boolean> slotReturnFuture = terminalStateFuture.handle((Object ignored, Throwable throwable) -> {
    +			if (state == State.RELEASING) {
    --- End diff --
    
    True, this entails a logical coupling which is currently the case. I'll add the double check to make it independent of this assumption.


---