You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 08:07:55 UTC

[3/3] flink git commit: [FLINK-8085] Thin out LogicalSlot interface

[FLINK-8085] Thin out LogicalSlot interface

Remove isCanceled, isReleased method and decouple logical slot from Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload interface
is implemented by the Execution and allows to fail an implementation and obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the Execution's
assigned resource has been released.

This closes #5087.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7bca9e46
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7bca9e46
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7bca9e46

Branch: refs/heads/master
Commit: 7bca9e4613ff30ab6a9c11e673a785f3f5c86e69
Parents: bb9c64b
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 15 14:20:27 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Dec 13 22:37:50 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  83 +++++++++------
 .../runtime/executiongraph/ExecutionGraph.java  |  31 ++++--
 .../runtime/executiongraph/ExecutionVertex.java |   4 +-
 .../failover/RestartIndividualStrategy.java     |   2 +-
 .../apache/flink/runtime/instance/Instance.java |  33 +++---
 .../flink/runtime/instance/LogicalSlot.java     |  54 +++++++---
 .../flink/runtime/instance/SharedSlot.java      |   2 +-
 .../flink/runtime/instance/SimpleSlot.java      | 101 ++++++++++++-------
 .../org/apache/flink/runtime/instance/Slot.java |   5 +-
 .../apache/flink/runtime/instance/SlotPool.java |   8 +-
 .../instance/SlotSharingGroupAssignment.java    |   8 +-
 .../scheduler/CoLocationConstraint.java         |   2 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |  10 +-
 .../runtime/jobmanager/slots/SlotOwner.java     |  10 +-
 .../ExecutionGraphRestartTest.java              |  12 +--
 .../ExecutionGraphSchedulingTest.java           |  24 ++++-
 .../ExecutionGraphSuspendTest.java              |   2 +-
 .../runtime/executiongraph/ExecutionTest.java   |  75 ++++++++++++--
 .../ExecutionVertexCancelTest.java              |   2 +-
 .../ExecutionVertexSchedulingTest.java          |   4 +-
 .../executiongraph/GlobalModVersionTest.java    |   2 +-
 .../IndividualRestartsConcurrencyTest.java      |   2 +-
 .../PipelinedRegionFailoverConcurrencyTest.java |   2 +-
 .../utils/SimpleSlotProvider.java               |   4 +-
 .../flink/runtime/instance/InstanceTest.java    |  16 +--
 .../flink/runtime/instance/SharedSlotsTest.java |  50 ++++-----
 .../flink/runtime/instance/SimpleSlotTest.java  |  66 ++++--------
 .../flink/runtime/instance/SlotPoolTest.java    |   6 +-
 .../runtime/instance/TestingLogicalSlot.java    |  30 +++---
 .../flink/runtime/instance/TestingPayload.java  |  44 ++++++++
 .../scheduler/CoLocationConstraintTest.java     |   2 +-
 .../scheduler/SchedulerIsolatedTasksTest.java   |   9 +-
 .../jobmanager/slots/TestingSlotOwner.java      |  47 +++++++++
 .../runtime/testingUtils/TestingUtils.scala     |   5 +-
 34 files changed, 497 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 00a452d..12a6749 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -93,7 +93,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting
  * actions if it is not. Many actions are also idempotent (like canceling).
  */
-public class Execution implements AccessExecution, Archiveable<ArchivedExecution> {
+public class Execution implements AccessExecution, Archiveable<ArchivedExecution>, LogicalSlot.Payload {
 
 	private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
@@ -135,7 +135,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
 
 	/** A future that completes once the Execution reaches a terminal ExecutionState */
-	private final CompletableFuture<ExecutionState> terminationFuture;
+	private final CompletableFuture<ExecutionState> terminalStateFuture;
+
+	private final CompletableFuture<?> releaseFuture;
 
 	private final CompletableFuture<TaskManagerLocation> taskManagerLocationFuture;
 
@@ -197,7 +199,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		markTimestamp(ExecutionState.CREATED, startTimestamp);
 
 		this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
-		this.terminationFuture = new CompletableFuture<>();
+		this.terminalStateFuture = new CompletableFuture<>();
+		this.releaseFuture = new CompletableFuture<>();
 		this.taskManagerLocationFuture = new CompletableFuture<>();
 
 		this.assignedResource = null;
@@ -329,10 +332,21 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * Gets a future that completes once the task execution reaches a terminal state.
 	 * The future will be completed with specific state that the execution reached.
 	 *
-	 * @return A future for the execution's termination
+	 * @return A future which is completed once the execution reaches a terminal state
+	 */
+	@Override
+	public CompletableFuture<ExecutionState> getTerminalStateFuture() {
+		return terminalStateFuture;
+	}
+
+	/**
+	 * Gets the release future which is completed once the execution reaches a terminal
+	 * state and the assigned resource has been released.
+	 *
+	 * @return A future which is completed once the assigned resource has been released
 	 */
-	public CompletableFuture<ExecutionState> getTerminationFuture() {
-		return terminationFuture;
+	public CompletableFuture<?> getReleaseFuture() {
+		return releaseFuture;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -493,7 +507,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 		try {
 			// good, we are allowed to deploy
-			if (!slot.setExecution(this)) {
+			if (!slot.tryAssignPayload(this)) {
 				throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
 			}
 
@@ -608,15 +622,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					try {
 						vertex.getExecutionGraph().deregisterExecution(this);
 
-						final LogicalSlot slot = assignedResource;
-
-						if (slot != null) {
-							slot.releaseSlot();
-						}
+						releaseAssignedResource();
 					}
 					finally {
 						vertex.executionCanceled(this);
-						terminationFuture.complete(CANCELED);
 					}
 					return;
 				}
@@ -757,6 +766,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 *
 	 * @param t The exception that caused the task to fail.
 	 */
+	@Override
 	public void fail(Throwable t) {
 		processFail(t, false);
 	}
@@ -880,17 +890,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 						updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
-						final LogicalSlot slot = assignedResource;
-
-						if (slot != null) {
-							slot.releaseSlot();
-						}
+						releaseAssignedResource();
 
 						vertex.getExecutionGraph().deregisterExecution(this);
 					}
 					finally {
 						vertex.executionFinished(this);
-						terminationFuture.complete(FINISHED);
 					}
 					return;
 				}
@@ -938,17 +943,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 				if (transitionState(current, CANCELED)) {
 					try {
-						final LogicalSlot slot = assignedResource;
-
-						if (slot != null) {
-							slot.releaseSlot();
-						}
+						releaseAssignedResource();
 
 						vertex.getExecutionGraph().deregisterExecution(this);
 					}
 					finally {
 						vertex.executionCanceled(this);
-						terminationFuture.complete(CANCELED);
 					}
 					return;
 				}
@@ -1035,15 +1035,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
 				try {
-					final LogicalSlot slot = assignedResource;
-					if (slot != null) {
-						slot.releaseSlot();
-					}
+					releaseAssignedResource();
 					vertex.getExecutionGraph().deregisterExecution(this);
 				}
 				finally {
 					vertex.executionFailed(this, t);
-					terminationFuture.complete(FAILED);
 				}
 
 				if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
@@ -1177,6 +1173,28 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
+	/**
+	 * Releases the assigned resource and completes the release future
+	 * once the assigned resource has been successfully released
+	 */
+	private void releaseAssignedResource() {
+		final LogicalSlot slot = assignedResource;
+
+		if (slot != null) {
+			slot.releaseSlot().whenComplete(
+				(Object ignored, Throwable throwable) -> {
+					if (throwable != null) {
+						releaseFuture.completeExceptionally(throwable);
+					} else {
+						releaseFuture.complete(null);
+					}
+				});
+		} else {
+			// no assigned resource --> we can directly complete the release future
+			releaseFuture.complete(null);
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Miscellaneous
 	// --------------------------------------------------------------------------------------------
@@ -1240,6 +1258,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				LOG.info("{} ({}) switched from {} to {}.", getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState, error);
 			}
 
+			if (targetState.isTerminal()) {
+				// complete the terminal state future
+				terminalStateFuture.complete(targetState);
+			}
+
 			// make sure that the state transition completes normally.
 			// potential errors (in listeners may not affect the main logic)
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 002f9a0..c4ff6fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -965,11 +965,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 					// we build a future that is complete once all vertices have reached a terminal state
 					final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
-					allTerminal.thenAccept(
-						(Void value) -> {
-							// cancellations may currently be overridden by failures which trigger
-							// restarts, so we need to pass a proper restart global version here
-							allVerticesInTerminalState(globalVersionForRestart);
+					allTerminal.whenComplete(
+						(Void value, Throwable throwable) -> {
+							if (throwable != null) {
+								transitionState(
+									JobStatus.CANCELLING,
+									JobStatus.FAILED,
+									new FlinkException(
+										"Could not cancel job " + getJobName() + " because not all execution job vertices could be cancelled.",
+										throwable));
+							} else {
+								// cancellations may currently be overridden by failures which trigger
+								// restarts, so we need to pass a proper restart global version here
+								allVerticesInTerminalState(globalVersionForRestart);
+							}
 						}
 					);
 
@@ -1125,7 +1134,17 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				}
 
 				final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
-				allTerminal.thenAccept((Void value) -> allVerticesInTerminalState(globalVersionForRestart));
+				allTerminal.whenComplete(
+					(Void ignored, Throwable throwable) -> {
+						if (throwable != null) {
+							transitionState(
+								JobStatus.FAILING,
+								JobStatus.FAILED,
+								new FlinkException("Could not cancel all execution job vertices properly.", throwable));
+						} else {
+							allVerticesInTerminalState(globalVersionForRestart);
+						}
+					});
 
 				return;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index c2c986f..27f2d5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -631,12 +631,12 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	 *  
 	 * @return A future that completes once the execution has reached its final state.
 	 */
-	public CompletableFuture<ExecutionState> cancel() {
+	public CompletableFuture<?> cancel() {
 		// to avoid any case of mixup in the presence of concurrent calls,
 		// we copy a reference to the stack to make sure both calls go to the same Execution 
 		final Execution exec = this.currentExecution;
 		exec.cancel();
-		return exec.getTerminationFuture();
+		return exec.getReleaseFuture();
 	}
 
 	public void stop() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
index 80f1d2f..f133d34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
@@ -106,7 +106,7 @@ public class RestartIndividualStrategy extends FailoverStrategy {
 		// Note: currently all tasks passed here are already in their terminal state,
 		//       so we could actually avoid the future. We use it anyways because it is cheap and
 		//       it helps to support better testing
-		final CompletableFuture<ExecutionState> terminationFuture = taskExecution.getTerminationFuture();
+		final CompletableFuture<ExecutionState> terminationFuture = taskExecution.getTerminalStateFuture();
 
 		final ExecutionVertex vertexToRecover = taskExecution.getVertex(); 
 		final long globalModVersion = taskExecution.getGlobalModVersion();

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index b5c6f23..d099f6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -18,26 +18,27 @@
 
 package org.apache.flink.runtime.instance;
 
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An instance represents a {@link org.apache.flink.runtime.taskmanager.TaskManager}
@@ -164,7 +165,7 @@ public class Instance implements SlotOwner {
 		 * the instance lock
 		 */
 		for (Slot slot : slots) {
-			slot.releaseSlot();
+			slot.releaseInstanceSlot();
 		}
 	}
 
@@ -285,10 +286,10 @@ public class Instance implements SlotOwner {
 	 * "released", this method will do nothing.</p>
 	 * 
 	 * @param slot The slot to return.
-	 * @return True, if the slot was returned, false if not.
+	 * @return Future which is completed with true, if the slot was returned, false if not.
 	 */
 	@Override
-	public boolean returnAllocatedSlot(Slot slot) {
+	public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
 		checkNotNull(slot);
 		checkArgument(!slot.isAlive(), "slot is still alive");
 		checkArgument(slot.getOwner() == this, "slot belongs to the wrong TaskManager.");
@@ -297,7 +298,7 @@ public class Instance implements SlotOwner {
 			LOG.debug("Return allocated slot {}.", slot);
 			synchronized (instanceLock) {
 				if (isDead) {
-					return false;
+					return CompletableFuture.completedFuture(false);
 				}
 
 				if (this.allocatedSlots.remove(slot)) {
@@ -307,7 +308,7 @@ public class Instance implements SlotOwner {
 						this.slotAvailabilityListener.newSlotAvailable(this);
 					}
 
-					return true;
+					return CompletableFuture.completedFuture(true);
 				}
 				else {
 					throw new IllegalArgumentException("Slot was not allocated from this TaskManager.");
@@ -315,7 +316,7 @@ public class Instance implements SlotOwner {
 			}
 		}
 		else {
-			return false;
+			return CompletableFuture.completedFuture(false);
 		}
 	}
 
@@ -327,7 +328,7 @@ public class Instance implements SlotOwner {
 		}
 
 		for (Slot slot : copy) {
-			slot.releaseSlot();
+			slot.releaseInstanceSlot();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
index 3ebe107..e663265 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
@@ -19,10 +19,13 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
 /**
  * A logical slot represents a resource on a TaskManager into
  * which a single task can be deployed.
@@ -51,31 +54,29 @@ public interface LogicalSlot {
 	boolean isAlive();
 
 	/**
-	 * True if the slot is canceled.
-	 *
-	 * @return True if the slot is canceled, otherwise false
-	 */
-	boolean isCanceled();
-
-	/**
-	 * True if the slot is released.
+	 * Tries to assign a payload to this slot. This can only happens
+	 * exactly once.
 	 *
-	 * @return True if the slot is released, otherwise false
+	 * @param payload to be assigned to this slot.
+	 * @return true if the payload could be set, otherwise false
 	 */
-	boolean isReleased();
+	boolean tryAssignPayload(Payload payload);
 
 	/**
-	 * Sets the execution for this slot.
+	 * Returns the set payload or null if none.
 	 *
-	 * @param execution to set for this slot
-	 * @return true if the slot could be set, otherwise false
+	 * @return Payload of this slot of null if none
 	 */
-	boolean setExecution(Execution execution);
+	@Nullable
+	Payload getPayload();
 
 	/**
 	 * Releases this slot.
+	 *
+	 * @return Future which is completed once the slot has been released,
+	 * 		in case of a failure it is completed exceptionally
 	 */
-	void releaseSlot();
+	CompletableFuture<?> releaseSlot();
 
 	/**
 	 * Gets the slot number on the TaskManager.
@@ -90,4 +91,25 @@ public interface LogicalSlot {
 	 * @return allocation id of this slot
 	 */
 	AllocationID getAllocationId();
+
+	/**
+	 * Payload for a logical slot.
+	 */
+	interface Payload {
+
+		/**
+		 * Fail the payload with the given cause.
+		 *
+		 * @param cause of the failure
+		 */
+		void fail(Throwable cause);
+
+		/**
+		 * Gets the terminal state future which is completed once the payload
+		 * has reached a terminal state.
+		 *
+		 * @return Terminal state future
+		 */
+		CompletableFuture<?> getTerminalStateFuture();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
index 106a8ef..2ce4fc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -188,7 +188,7 @@ public class SharedSlot extends Slot {
 	}
 	
 	@Override
-	public void releaseSlot() {
+	public void releaseInstanceSlot() {
 		assignmentGroup.releaseSharedSlot(this);
 		
 		if (!(isReleased() && subSlots.isEmpty())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index 9591028..0c9e11c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -18,18 +18,21 @@
 
 package org.apache.flink.runtime.instance;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 /**
@@ -40,14 +43,16 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
  */
 public class SimpleSlot extends Slot implements LogicalSlot {
 
-	/** The updater used to atomically swap in the execution */
-	private static final AtomicReferenceFieldUpdater<SimpleSlot, Execution> VERTEX_UPDATER =
-			AtomicReferenceFieldUpdater.newUpdater(SimpleSlot.class, Execution.class, "executedTask");
+	/** The updater used to atomically swap in the payload */
+	private static final AtomicReferenceFieldUpdater<SimpleSlot, Payload> PAYLOAD_UPDATER =
+			AtomicReferenceFieldUpdater.newUpdater(SimpleSlot.class, Payload.class, "payload");
 
 	// ------------------------------------------------------------------------
 
-	/** Task being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */
-	private volatile Execution executedTask;
+	private final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
+
+	/** Id of the task being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */
+	private volatile Payload payload;
 
 	/** The locality attached to the slot, defining whether the slot was allocated at the desired location. */
 	private volatile Locality locality = Locality.UNCONSTRAINED;
@@ -149,26 +154,14 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 	}
 
 	/**
-	 * Gets the task execution attempt currently executed in this slot. This may return null, if no
-	 * task execution attempt has been placed into this slot.
-	 *
-	 * @return The slot's task execution attempt, or null, if no task is executed in this slot, yet.
-	 */
-	public Execution getExecutedVertex() {
-		return executedTask;
-	}
-
-	/**
 	 * Atomically sets the executed vertex, if no vertex has been assigned to this slot so far.
 	 *
-	 * @param executedVertex The vertex to assign to this slot.
+	 * @param payload The vertex to assign to this slot.
 	 * @return True, if the vertex was assigned, false, otherwise.
 	 */
 	@Override
-	public boolean setExecution(Execution executedVertex) {
-		if (executedVertex == null) {
-			throw new NullPointerException();
-		}
+	public boolean tryAssignPayload(Payload payload) {
+		Preconditions.checkNotNull(payload);
 
 		// check that we can actually run in this slot
 		if (isCanceled()) {
@@ -176,19 +169,25 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 		}
 
 		// atomically assign the vertex
-		if (!VERTEX_UPDATER.compareAndSet(this, null, executedVertex)) {
+		if (!PAYLOAD_UPDATER.compareAndSet(this, null, payload)) {
 			return false;
 		}
 
 		// we need to do a double check that we were not cancelled in the meantime
 		if (isCanceled()) {
-			this.executedTask = null;
+			this.payload = null;
 			return false;
 		}
 
 		return true;
 	}
 
+	@Nullable
+	@Override
+	public Payload getPayload() {
+		return payload;
+	}
+
 	/**
 	 * Gets the locality information attached to this slot.
 	 * @return The locality attached to the slot.
@@ -210,27 +209,51 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void releaseSlot() {
+	public void releaseInstanceSlot() {
+		releaseSlot();
+	}
+
+	@Override
+	public CompletableFuture<?> releaseSlot() {
 		if (!isCanceled()) {
+			final CompletableFuture<?> terminationFuture;
 
-			// kill all tasks currently running in this slot
-			Execution exec = this.executedTask;
-			if (exec != null && !exec.isFinished()) {
-				exec.fail(new Exception("TaskManager was lost/killed: " + getTaskManagerLocation()));
-			}
+			if (payload != null) {
+				// trigger the failure of the slot payload
+				payload.fail(new FlinkException("TaskManager was lost/killed: " + getTaskManagerLocation()));
 
-			// release directly (if we are directly allocated),
-			// otherwise release through the parent shared slot
-			if (getParent() == null) {
-				// we have to give back the slot to the owning instance
-				if (markCancelled()) {
-					getOwner().returnAllocatedSlot(this);
-				}
+				// wait for the termination of the payload before releasing the slot
+				terminationFuture = payload.getTerminalStateFuture();
 			} else {
-				// we have to ask our parent to dispose us
-				getParent().releaseChild(this);
+				terminationFuture = CompletableFuture.completedFuture(null);
 			}
+
+			terminationFuture.whenComplete(
+				(Object ignored, Throwable throwable) -> {
+					// release directly (if we are directly allocated),
+					// otherwise release through the parent shared slot
+					if (getParent() == null) {
+						// we have to give back the slot to the owning instance
+						if (markCancelled()) {
+							getOwner().returnAllocatedSlot(this).whenComplete(
+								(value, t) -> {
+									if (t != null) {
+										releaseFuture.completeExceptionally(t);
+									} else {
+										releaseFuture.complete(null);
+									}
+								});
+						}
+					} else {
+						// we have to ask our parent to dispose us
+						getParent().releaseChild(this);
+
+						releaseFuture.complete(null);
+					}
+				});
 		}
+
+		return releaseFuture;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index d6d8f12..804682b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.instance;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -26,9 +27,9 @@ import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.api.common.JobID;
 
 import javax.annotation.Nullable;
+
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -343,7 +344,7 @@ public abstract class Slot {
 	 * If this slot is a simple slot, it will be returned to its instance. If it is a shared slot,
 	 * it will release all of its sub-slots and release itself.
 	 */
-	public abstract void releaseSlot();
+	public abstract void releaseInstanceSlot();
 
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 66af865..771d690 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -643,7 +643,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 			if (slot != null) {
 				// release the slot.
 				// since it is not in 'allocatedSlots' any more, it will be dropped o return'
-				slot.releaseSlot();
+				slot.releaseInstanceSlot();
 			}
 			else {
 				LOG.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
@@ -683,7 +683,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 			final Set<Slot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
 			for (Slot slot : allocatedSlotsForResource) {
-				slot.releaseSlot();
+				slot.releaseInstanceSlot();
 			}
 		}
 
@@ -1081,9 +1081,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		}
 
 		@Override
-		public boolean returnAllocatedSlot(Slot slot) {
+		public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
 			gateway.returnAllocatedSlot(slot);
-			return true;
+			return CompletableFuture.completedFuture(true);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 4371290..45b4a96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -215,7 +215,7 @@ public class SlotSharingGroupAssignment {
 						// note that this does implicitly release the slot we have just added
 						// as well, because we release its last child slot. That is expected
 						// and desired.
-						constraintGroupSlot.releaseSlot();
+						constraintGroupSlot.releaseInstanceSlot();
 					}
 				}
 				else {
@@ -507,7 +507,7 @@ public class SlotSharingGroupAssignment {
 	}
 
 	/**
-	 * Called from {@link org.apache.flink.runtime.instance.SharedSlot#releaseSlot()}.
+	 * Called from {@link org.apache.flink.runtime.instance.SharedSlot#releaseInstanceSlot()}.
 	 * 
 	 * @param sharedSlot The slot to be released.
 	 */
@@ -520,7 +520,7 @@ public class SlotSharingGroupAssignment {
 					// by simply releasing all children, we should eventually release this slot.
 					Set<Slot> children = sharedSlot.getSubSlots();
 					while (children.size() > 0) {
-						children.iterator().next().releaseSlot();
+						children.iterator().next().releaseInstanceSlot();
 					}
 				}
 				else {
@@ -551,7 +551,7 @@ public class SlotSharingGroupAssignment {
 		if (parent == null) {
 			// root slot, return to the instance.
 			sharedSlot.getOwner().returnAllocatedSlot(sharedSlot);
-			
+
 			// also, make sure we remove this slot from everywhere
 			allSlots.remove(sharedSlot);
 			removeSlotFromAllEntries(availableSlotsPerJid, sharedSlot);

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index c41f7bf..ffc1a7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -141,7 +141,7 @@ public class CoLocationConstraint {
 						"Cannot assign different location to a constraint whose location is locked.");
 			}
 			if (this.sharedSlot.isAlive()) {
-				this.sharedSlot.releaseSlot();
+				this.sharedSlot.releaseInstanceSlot();
 			}
 
 			this.sharedSlot = newSlot;

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 2715146..8857be7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -274,7 +274,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 						// if there is no slot from the group, or the new slot is local,
 						// then we use the new slot
 						if (slotFromGroup != null) {
-							slotFromGroup.releaseSlot();
+							slotFromGroup.releaseInstanceSlot();
 						}
 						toUse = newSlot;
 					}
@@ -282,7 +282,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 						// both are available and usable. neither is local. in that case, we may
 						// as well use the slot from the sharing group, to minimize the number of
 						// instances that the job occupies
-						newSlot.releaseSlot();
+						newSlot.releaseInstanceSlot();
 						toUse = slotFromGroup;
 					}
 
@@ -299,10 +299,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 				}
 				catch (Throwable t) {
 					if (slotFromGroup != null) {
-						slotFromGroup.releaseSlot();
+						slotFromGroup.releaseInstanceSlot();
 					}
 					if (newSlot != null) {
-						newSlot.releaseSlot();
+						newSlot.releaseInstanceSlot();
 					}
 
 					ExceptionUtils.rethrow(t, "An error occurred while allocating a slot in a sharing group");
@@ -444,7 +444,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 					}
 					else {
 						// could not add and allocate the sub-slot, so release shared slot
-						sharedSlot.releaseSlot();
+						sharedSlot.releaseInstanceSlot();
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
index ad9c784..cb4488d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
@@ -20,10 +20,18 @@ package org.apache.flink.runtime.jobmanager.slots;
 
 import org.apache.flink.runtime.instance.Slot;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Interface for components that hold slots and to which slots get released / recycled.
  */
 public interface SlotOwner {
 
-	boolean returnAllocatedSlot(Slot slot);
+	/**
+	 * Return the given slot to the slot owner.
+	 *
+	 * @param slot to return
+	 * @return Future which is completed with true if the slot could be returned, otherwise with false
+	 */
+	CompletableFuture<Boolean> returnAllocatedSlot(Slot slot);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 8770b06..80df852 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -31,10 +31,10 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
@@ -64,9 +64,6 @@ import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Test;
 
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Iterator;
@@ -79,6 +76,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
@@ -90,10 +90,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Mockito.spy;
 
-
 public class ExecutionGraphRestartTest extends TestLogger {
 
 	private final static int NUM_TASKS = 31;
@@ -473,7 +471,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			v.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
 		}
 
-		assertEquals(JobStatus.CANCELED, eg.getState());
+		assertEquals(JobStatus.CANCELED, eg.getTerminationFuture().get());
 
 		Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 71ca3a5..2e6558a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -52,12 +53,15 @@ import org.junit.Test;
 import org.mockito.verification.Timeout;
 
 import java.net.InetAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
@@ -278,7 +282,10 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		//  Create the slots, futures, and the slot provider
 
 		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
-		final SlotOwner slotOwner = mock(SlotOwner.class);
+		final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(parallelism);
+		final TestingSlotOwner slotOwner = new TestingSlotOwner();
+		slotOwner.setReturnAllocatedSlotConsumer(
+			(Slot slot) -> returnedSlots.offer(slot.getAllocatedSlot().getSlotAllocationId()));
 
 		final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
 		final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
@@ -324,7 +331,9 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		eg.getTerminationFuture().get(2000, TimeUnit.MILLISECONDS);
 
 		// wait until all slots are back
-		verify(slotOwner, new Timeout(2000, times(6))).returnAllocatedSlot(any(Slot.class));
+		for (int i = 0; i < parallelism; i++) {
+			returnedSlots.poll(2000L, TimeUnit.MILLISECONDS);
+		}
 
 		// no deployment calls must have happened
 		verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
@@ -354,7 +363,10 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final JobID jobId = new JobID();
 		final JobGraph jobGraph = new JobGraph(jobId, "test", vertex);
 
-		final SlotOwner slotOwner = mock(SlotOwner.class);
+		final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(2);
+		final TestingSlotOwner slotOwner = new TestingSlotOwner();
+		slotOwner.setReturnAllocatedSlotConsumer(
+			(Slot slot) -> returnedSlots.offer(slot.getAllocatedSlot().getSlotAllocationId()));
 
 		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
 		final SimpleSlot[] slots = new SimpleSlot[parallelism];
@@ -388,14 +400,16 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		eg.getTerminationFuture().get(2000, TimeUnit.MILLISECONDS);
 
 		// wait until all slots are back
-		verify(slotOwner, new Timeout(2000, times(2))).returnAllocatedSlot(any(Slot.class));
+		for (int i = 0; i < parallelism - 1; i++) {
+			returnedSlots.poll(2000, TimeUnit.MILLISECONDS);
+		}
 
 		//  verify that no deployments have happened
 		verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
 
 		for (CompletableFuture<LogicalSlot> future : slotFutures) {
 			if (future.isDone()) {
-				assertTrue(future.get().isCanceled());
+				assertFalse(future.get().isAlive());
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index f0adc32..65a52bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -219,7 +219,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 		validateCancelRpcCalls(gateway, parallelism);
 
 		ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
-		assertEquals(JobStatus.CANCELED, eg.getState());
+		assertEquals(JobStatus.CANCELED, eg.getTerminationFuture().get());
 
 		// suspend
 		eg.suspend(new Exception("suspend"));

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 43a6432..c6fb836 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
 
@@ -77,7 +78,7 @@ public class ExecutionTest extends TestLogger {
 
 		final Execution execution = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
 
-		final TestingSlotOwner slotOwner = new TestingSlotOwner();
+		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
 		final SimpleSlot slot = new SimpleSlot(
 			new JobID(),
@@ -117,7 +118,7 @@ public class ExecutionTest extends TestLogger {
 		final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
 		jobVertex.setInvokableClass(NoOpInvokable.class);
 
-		final TestingSlotOwner slotOwner = new TestingSlotOwner();
+		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
 		final SimpleSlot slot = new SimpleSlot(
 			new JobID(),
@@ -167,7 +168,7 @@ public class ExecutionTest extends TestLogger {
 		final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
 		jobVertex.setInvokableClass(NoOpInvokable.class);
 
-		final TestingSlotOwner slotOwner = new TestingSlotOwner();
+		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
 		final SimpleSlot slot = new SimpleSlot(
 			new JobID(),
@@ -269,9 +270,71 @@ public class ExecutionTest extends TestLogger {
 	}
 
 	/**
+	 * Checks that the {@link Execution} termination future is only completed after the
+	 * assigned slot has been released.
+	 *
+	 * <p>NOTE: This test only fails spuriously without the fix of this commit. Thus, one has
+	 * to execute this test multiple times to see the failure.
+	 */
+	@Test
+	public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception {
+		final JobVertexID jobVertexId = new JobVertexID();
+		final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+
+		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
+
+		final SimpleSlot slot = new SimpleSlot(
+			new JobID(),
+			slotOwner,
+			new LocalTaskManagerLocation(),
+			0,
+			new SimpleAckingTaskManagerGateway());
+
+		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
+		slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
+
+		ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
+			new JobID(),
+			slotProvider,
+			new NoRestartStrategy(),
+			jobVertex);
+
+		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
+
+		ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
+
+		assertTrue(executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY));
+
+		Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
+
+		CompletableFuture<Slot> returnedSlotFuture = slotOwner.getReturnedSlotFuture();
+		CompletableFuture<?> terminationFuture = executionVertex.cancel();
+
+		// run canceling in a separate thread to allow an interleaving between termination
+		// future callback registrations
+		CompletableFuture.runAsync(
+			() -> currentExecutionAttempt.cancelingComplete(),
+			TestingUtils.defaultExecutor());
+
+		// to increase probability for problematic interleaving, let the current thread yield the processor
+		Thread.yield();
+
+		CompletableFuture<Boolean> restartFuture = terminationFuture.thenApply(
+			ignored -> {
+				assertTrue(returnedSlotFuture.isDone());
+				return true;
+			});
+
+
+		// check if the returned slot future was completed first
+		restartFuture.get();
+	}
+
+	/**
 	 * Slot owner which records the first returned slot.
 	 */
-	public static final class TestingSlotOwner implements SlotOwner {
+	private static final class SingleSlotTestingSlotOwner implements SlotOwner {
 
 		final CompletableFuture<Slot> returnedSlot = new CompletableFuture<>();
 
@@ -280,8 +343,8 @@ public class ExecutionTest extends TestLogger {
 		}
 
 		@Override
-		public boolean returnAllocatedSlot(Slot slot) {
-			return returnedSlot.complete(slot);
+		public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
+			return CompletableFuture.completedFuture(returnedSlot.complete(slot));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index cb31d15..1b8daca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -423,7 +423,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 		exec.markFailed(new Exception("test"));
 		assertTrue(exec.getState() == ExecutionState.FAILED || exec.getState() == ExecutionState.CANCELED);
 
-		assertTrue(exec.getAssignedResource().isCanceled());
+		assertFalse(exec.getAssignedResource().isAlive());
 		assertEquals(vertices.length - 1, exec.getVertex().getExecutionGraph().getRegisteredExecutions().size());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 27f7f51..2941739 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -59,7 +59,7 @@ public class ExecutionVertexSchedulingTest {
 			final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 			
-			slot.releaseSlot();
+			slot.releaseInstanceSlot();
 			assertTrue(slot.isReleased());
 
 			Scheduler scheduler = mock(Scheduler.class);
@@ -91,7 +91,7 @@ public class ExecutionVertexSchedulingTest {
 			final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
-			slot.releaseSlot();
+			slot.releaseInstanceSlot();
 			assertTrue(slot.isReleased());
 
 			final CompletableFuture<SimpleSlot> future = new CompletableFuture<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
index 534c33d..bfad327 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
@@ -88,7 +88,7 @@ public class GlobalModVersionTest extends TestLogger {
 			exec.cancelingComplete();
 		}
 
-		assertEquals(JobStatus.CANCELED, graph.getState());
+		assertEquals(JobStatus.CANCELED, graph.getTerminationFuture().get());
 
 		// no failure notification at all
 		verify(mockStrategy, times(0)).onTaskFailure(any(Execution.class), any(Throwable.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index c977503..85a6c2c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -139,7 +139,7 @@ public class IndividualRestartsConcurrencyTest extends TestLogger {
 		// now report that cancelling is complete for the other vertex
 		vertex2.getCurrentExecutionAttempt().cancelingComplete();
 
-		assertEquals(JobStatus.CANCELED, graph.getState());
+		assertEquals(JobStatus.CANCELED, graph.getTerminationFuture().get());
 		assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal());
 		assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
index 124a5b7..656c372 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
@@ -113,7 +113,7 @@ public class PipelinedRegionFailoverConcurrencyTest extends TestLogger {
 		// now report that cancelling is complete for the other vertex
 		vertex2.getCurrentExecutionAttempt().cancelingComplete();
 
-		assertEquals(JobStatus.CANCELED, graph.getState());
+		assertEquals(JobStatus.CANCELED, graph.getTerminationFuture().get());
 		assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal());
 		assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index 3d28983..14e0e66 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -96,11 +96,11 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 	}
 
 	@Override
-	public boolean returnAllocatedSlot(Slot slot) {
+	public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
 		synchronized (slots) {
 			slots.add(slot.getAllocatedSlot());
 		}
-		return true;
+		return CompletableFuture.completedFuture(true);
 	}
 
 	public int getNumberOfAvailableSlots() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 2bea039..5b85f72 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -79,18 +79,18 @@ public class InstanceTest {
 			}
 
 			// release the slots. this returns them to the instance
-			slot1.releaseSlot();
-			slot2.releaseSlot();
-			slot3.releaseSlot();
-			slot4.releaseSlot();
+			slot1.releaseInstanceSlot();
+			slot2.releaseInstanceSlot();
+			slot3.releaseInstanceSlot();
+			slot4.releaseInstanceSlot();
 
 			assertEquals(4, instance.getNumberOfAvailableSlots());
 			assertEquals(0, instance.getNumberOfAllocatedSlots());
 
-			assertFalse(instance.returnAllocatedSlot(slot1));
-			assertFalse(instance.returnAllocatedSlot(slot2));
-			assertFalse(instance.returnAllocatedSlot(slot3));
-			assertFalse(instance.returnAllocatedSlot(slot4));
+			assertFalse(instance.returnAllocatedSlot(slot1).get());
+			assertFalse(instance.returnAllocatedSlot(slot2).get());
+			assertFalse(instance.returnAllocatedSlot(slot3).get());
+			assertFalse(instance.returnAllocatedSlot(slot4).get());
 
 			assertEquals(4, instance.getNumberOfAvailableSlots());
 			assertEquals(0, instance.getNumberOfAllocatedSlots());

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
index 4a6bf75..1e2b6af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
@@ -80,7 +80,7 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(0, slot.getRootSlotNumber());
 			
 			// release the slot immediately.
-			slot.releaseSlot();
+			slot.releaseInstanceSlot();
 
 			assertTrue(slot.isCanceled());
 			assertTrue(slot.isReleased());
@@ -129,7 +129,7 @@ public class SharedSlotsTest extends TestLogger {
 			SimpleSlot sub1 = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, vid1);
 			assertNotNull(sub1);
 
-			assertNull(sub1.getExecutedVertex());
+			assertNull(sub1.getPayload());
 			assertEquals(Locality.LOCAL, sub1.getLocality());
 			assertEquals(1, sub1.getNumberLeaves());
 			assertEquals(vid1, sub1.getGroupID());
@@ -148,7 +148,7 @@ public class SharedSlotsTest extends TestLogger {
 			SimpleSlot sub2 = assignment.getSlotForTask(vid2, NO_LOCATION);
 			assertNotNull(sub2);
 			
-			assertNull(sub2.getExecutedVertex());
+			assertNull(sub2.getPayload());
 			assertEquals(Locality.UNCONSTRAINED, sub2.getLocality());
 			assertEquals(1, sub2.getNumberLeaves());
 			assertEquals(vid2, sub2.getGroupID());
@@ -167,7 +167,7 @@ public class SharedSlotsTest extends TestLogger {
 			SimpleSlot sub3 = assignment.getSlotForTask(vid3, Collections.singleton(instance.getTaskManagerLocation()));
 			assertNotNull(sub3);
 			
-			assertNull(sub3.getExecutedVertex());
+			assertNull(sub3.getPayload());
 			assertEquals(Locality.LOCAL, sub3.getLocality());
 			assertEquals(1, sub3.getNumberLeaves());
 			assertEquals(vid3, sub3.getGroupID());
@@ -187,7 +187,7 @@ public class SharedSlotsTest extends TestLogger {
 					Collections.singleton(SchedulerTestUtils.getRandomInstance(1).getTaskManagerLocation()));
 			assertNotNull(sub4);
 			
-			assertNull(sub4.getExecutedVertex());
+			assertNull(sub4.getPayload());
 			assertEquals(Locality.NON_LOCAL, sub4.getLocality());
 			assertEquals(1, sub4.getNumberLeaves());
 			assertEquals(vid4, sub4.getGroupID());
@@ -204,7 +204,7 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid4));
 			
 			// release from the root.
-			sharedSlot.releaseSlot();
+			sharedSlot.releaseInstanceSlot();
 
 			assertTrue(sharedSlot.isReleased());
 			assertTrue(sub1.isReleased());
@@ -264,7 +264,7 @@ public class SharedSlotsTest extends TestLogger {
 			
 			// release from the leaves.
 			
-			sub2.releaseSlot();
+			sub2.releaseInstanceSlot();
 
 			assertTrue(sharedSlot.isAlive());
 			assertTrue(sub1.isAlive());
@@ -279,7 +279,7 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(2, sharedSlot.getNumberLeaves());
 
 			
-			sub1.releaseSlot();
+			sub1.releaseInstanceSlot();
 
 			assertTrue(sharedSlot.isAlive());
 			assertTrue(sub1.isReleased());
@@ -293,7 +293,7 @@ public class SharedSlotsTest extends TestLogger {
 			
 			assertEquals(1, sharedSlot.getNumberLeaves());
 
-			sub3.releaseSlot();
+			sub3.releaseInstanceSlot();
 
 			assertTrue(sharedSlot.isReleased());
 			assertTrue(sub1.isReleased());
@@ -348,7 +348,7 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(1, assignment.getNumberOfSlots());
 			
 			
-			sub2.releaseSlot();
+			sub2.releaseInstanceSlot();
 
 			assertEquals(1, sharedSlot.getNumberLeaves());
 			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1));
@@ -366,8 +366,8 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3));
 			assertEquals(1, assignment.getNumberOfSlots());
 			
-			sub3.releaseSlot();
-			sub1.releaseSlot();
+			sub3.releaseInstanceSlot();
+			sub1.releaseInstanceSlot();
 
 			assertTrue(sharedSlot.isReleased());
 			assertEquals(0, sharedSlot.getNumberLeaves());
@@ -443,7 +443,7 @@ public class SharedSlotsTest extends TestLogger {
 			assertFalse(constraint.isAssigned());
 			
 			// we do not immediately lock the location
-			headSlot.releaseSlot();
+			headSlot.releaseInstanceSlot();
 			assertEquals(1, sharedSlot.getNumberLeaves());
 
 			assertNotNull(constraint.getSharedSlot());
@@ -468,8 +468,8 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(4, sharedSlot.getNumberLeaves());
 			
 			// we release our co-location constraint tasks
-			headSlot.releaseSlot();
-			tailSlot.releaseSlot();
+			headSlot.releaseInstanceSlot();
+			tailSlot.releaseInstanceSlot();
 
 			assertEquals(2, sharedSlot.getNumberLeaves());
 			assertTrue(headSlot.isReleased());
@@ -501,10 +501,10 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(constraint.getGroupId(), constraint.getSharedSlot().getGroupID());
 			
 			// release all
-			sourceSlot.releaseSlot();
-			headSlot.releaseSlot();
-			tailSlot.releaseSlot();
-			sinkSlot.releaseSlot();
+			sourceSlot.releaseInstanceSlot();
+			headSlot.releaseInstanceSlot();
+			tailSlot.releaseInstanceSlot();
+			sinkSlot.releaseInstanceSlot();
 			
 			assertTrue(sharedSlot.isReleased());
 			assertTrue(sourceSlot.isReleased());
@@ -577,10 +577,10 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(4, sharedSlot.getNumberLeaves());
 
 			// release all
-			sourceSlot.releaseSlot();
-			headSlot.releaseSlot();
-			tailSlot.releaseSlot();
-			sinkSlot.releaseSlot();
+			sourceSlot.releaseInstanceSlot();
+			headSlot.releaseInstanceSlot();
+			tailSlot.releaseInstanceSlot();
+			sinkSlot.releaseInstanceSlot();
 
 			assertTrue(sharedSlot.isReleased());
 			assertTrue(sourceSlot.isReleased());
@@ -618,7 +618,7 @@ public class SharedSlotsTest extends TestLogger {
 			SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
 
 			SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid);
-			sub.releaseSlot();
+			sub.releaseInstanceSlot();
 			
 			assertTrue(sub.isReleased());
 			assertTrue(sharedSlot.isReleased());
@@ -654,7 +654,7 @@ public class SharedSlotsTest extends TestLogger {
 			assertNull(sub.getGroupID());
 			assertEquals(constraint.getSharedSlot(), sub.getParent());
 			
-			sub.releaseSlot();
+			sub.releaseInstanceSlot();
 
 			assertTrue(sub.isReleased());
 			assertTrue(sharedSlot.isReleased());

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index db71210..42cbbbf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -18,22 +18,21 @@
 
 package org.apache.flink.runtime.instance;
 
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
-
-import java.net.InetAddress;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.api.common.JobID;
-
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import org.mockito.Matchers;
+import java.net.InetAddress;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class SimpleSlotTest extends TestLogger {
 
@@ -45,7 +44,7 @@ public class SimpleSlotTest extends TestLogger {
 				SimpleSlot slot = getSlot();
 				assertTrue(slot.isAlive());
 
-				slot.releaseSlot();
+				slot.releaseInstanceSlot();
 				assertFalse(slot.isAlive());
 				assertTrue(slot.isCanceled());
 				assertTrue(slot.isReleased());
@@ -76,19 +75,19 @@ public class SimpleSlotTest extends TestLogger {
 	@Test
 	public void testSetExecutionVertex() {
 		try {
-			Execution ev = mock(Execution.class);
-			Execution ev_2 = mock(Execution.class);
+			TestingPayload payload1 = new TestingPayload();
+			TestingPayload payload2 = new TestingPayload();
 
 			// assign to alive slot
 			{
 				SimpleSlot slot = getSlot();
 
-				assertTrue(slot.setExecution(ev));
-				assertEquals(ev, slot.getExecutedVertex());
+				assertTrue(slot.tryAssignPayload(payload1));
+				assertEquals(payload1, slot.getPayload());
 
 				// try to add another one
-				assertFalse(slot.setExecution(ev_2));
-				assertEquals(ev, slot.getExecutedVertex());
+				assertFalse(slot.tryAssignPayload(payload2));
+				assertEquals(payload1, slot.getPayload());
 			}
 
 			// assign to canceled slot
@@ -96,8 +95,8 @@ public class SimpleSlotTest extends TestLogger {
 				SimpleSlot slot = getSlot();
 				assertTrue(slot.markCancelled());
 
-				assertFalse(slot.setExecution(ev));
-				assertNull(slot.getExecutedVertex());
+				assertFalse(slot.tryAssignPayload(payload1));
+				assertNull(slot.getPayload());
 			}
 
 			// assign to released marked slot
@@ -106,17 +105,17 @@ public class SimpleSlotTest extends TestLogger {
 				assertTrue(slot.markCancelled());
 				assertTrue(slot.markReleased());
 
-				assertFalse(slot.setExecution(ev));
-				assertNull(slot.getExecutedVertex());
+				assertFalse(slot.tryAssignPayload(payload1));
+				assertNull(slot.getPayload());
 			}
 			
 			// assign to released
 			{
 				SimpleSlot slot = getSlot();
-				slot.releaseSlot();
+				slot.releaseInstanceSlot();
 
-				assertFalse(slot.setExecution(ev));
-				assertNull(slot.getExecutedVertex());
+				assertFalse(slot.tryAssignPayload(payload1));
+				assertNull(slot.getPayload());
 			}
 		}
 		catch (Exception e) {
@@ -125,27 +124,6 @@ public class SimpleSlotTest extends TestLogger {
 		}
 	}
 
-	@Test
-	public void testReleaseCancelsVertex() {
-		try {
-			Execution ev = mock(Execution.class);
-
-			SimpleSlot slot = getSlot();
-			assertTrue(slot.setExecution(ev));
-			assertEquals(ev, slot.getExecutedVertex());
-
-			slot.releaseSlot();
-			slot.releaseSlot();
-			slot.releaseSlot();
-
-			verify(ev, times(1)).fail(Matchers.any(Throwable.class));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
 	public static SimpleSlot getSlot() throws Exception {
 		ResourceID resourceID = ResourceID.generate();
 		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 271bc2a..450d377 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -154,7 +154,7 @@ public class SlotPoolTest extends TestLogger {
 			assertTrue(future2.isDone());
 
 			assertNotEquals(slot1, slot2);
-			assertTrue(slot1.isReleased());
+			assertFalse(slot1.isAlive());
 			assertTrue(slot2.isAlive());
 			assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
 			assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
@@ -198,7 +198,7 @@ public class SlotPoolTest extends TestLogger {
 			assertTrue(future2.isDone());
 
 			assertNotEquals(slot1, slot2);
-			assertTrue(slot1.isReleased());
+			assertFalse(slot1.isAlive());
 			assertTrue(slot2.isAlive());
 			assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
 			assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
@@ -294,7 +294,7 @@ public class SlotPoolTest extends TestLogger {
 			// wait until the slot has been returned
 			slotReturnFuture.get();
 
-			assertTrue(slot1.isReleased());
+			assertFalse(slot1.isAlive());
 
 			// slot released and not usable, second allocation still not fulfilled
 			Thread.sleep(10);

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
index 36a47b7..925933d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
@@ -19,13 +19,14 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -38,12 +39,12 @@ public class TestingLogicalSlot implements LogicalSlot {
 
 	private final TaskManagerGateway taskManagerGateway;
 
-	private final CompletableFuture<?> releaseFuture;
-
-	private final AtomicReference<Execution> executionReference;
+	private final AtomicReference<Payload> payloadReference;
 
 	private final int slotNumber;
 
+	private final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
+
 	private final AllocationID allocationId;
 
 	public TestingLogicalSlot() {
@@ -61,8 +62,7 @@ public class TestingLogicalSlot implements LogicalSlot {
 			AllocationID allocationId) {
 		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
-		this.releaseFuture = new CompletableFuture<>();
-		this.executionReference = new AtomicReference<>();
+		this.payloadReference = new AtomicReference<>();
 		this.slotNumber = slotNumber;
 		this.allocationId = Preconditions.checkNotNull(allocationId);
 	}
@@ -83,23 +83,21 @@ public class TestingLogicalSlot implements LogicalSlot {
 	}
 
 	@Override
-	public boolean isCanceled() {
-		return releaseFuture.isDone();
+	public boolean tryAssignPayload(Payload payload) {
+		return payloadReference.compareAndSet(null, payload);
 	}
 
+	@Nullable
 	@Override
-	public boolean isReleased() {
-		return releaseFuture.isDone();
+	public Payload getPayload() {
+		return payloadReference.get();
 	}
 
 	@Override
-	public boolean setExecution(Execution execution) {
-		return executionReference.compareAndSet(null, execution);
-	}
-
-	@Override
-	public void releaseSlot() {
+	public CompletableFuture<?> releaseSlot() {
 		releaseFuture.complete(null);
+
+		return releaseFuture;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java
new file mode 100644
index 0000000..3369882
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Simple payload implementation for testing purposes.
+ */
+public class TestingPayload implements LogicalSlot.Payload {
+
+	private final CompletableFuture<?> terminationFuture;
+
+	public TestingPayload() {
+		this.terminationFuture = new CompletableFuture<>();
+	}
+
+
+	@Override
+	public void fail(Throwable cause) {
+		terminationFuture.complete(null);
+	}
+
+	@Override
+	public CompletableFuture<?> getTerminalStateFuture() {
+		return terminationFuture;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
index 1344aef..3f267ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
@@ -143,7 +143,7 @@ public class CoLocationConstraintTest {
 			assertEquals(instance2.getTaskManagerLocation(), constraint.getLocation());
 			
 			// release the slot
-			slot2_1.releaseSlot();
+			slot2_1.releaseInstanceSlot();
 
 			// we should still have a location
 			assertTrue(constraint.isAssigned());

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 371cca7..2ece70f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -49,7 +50,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests for the {@link Scheduler} when scheduling individual tasks.
  */
-public class SchedulerIsolatedTasksTest {
+public class SchedulerIsolatedTasksTest extends TestLogger {
 
 	@Test
 	public void testAddAndRemoveInstance() {
@@ -297,10 +298,10 @@ public class SchedulerIsolatedTasksTest {
 			i2.markDead();
 			
 			for (LogicalSlot slot : slots) {
-				if (Objects.equals(slot.getTaskManagerLocation().getResourceID(), i2.getTaskManagerID())) {
-					assertTrue(slot.isCanceled());
+				if (slot.getTaskManagerLocation().getResourceID().equals(i2.getTaskManagerID())) {
+					assertFalse(slot.isAlive());
 				} else {
-					assertFalse(slot.isCanceled());
+					assertTrue(slot.isAlive());
 				}
 				
 				slot.releaseSlot();

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
new file mode 100644
index 0000000..7c124ef
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.instance.Slot;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * {@link SlotOwner} implementation for testing purposes.
+ */
+public class TestingSlotOwner implements SlotOwner {
+
+	private volatile Consumer<Slot> returnAllocatedSlotConsumer;
+
+	public void setReturnAllocatedSlotConsumer(Consumer<Slot> returnAllocatedSlotConsumer) {
+		this.returnAllocatedSlotConsumer = returnAllocatedSlotConsumer;
+	}
+
+	@Override
+	public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
+		final Consumer<Slot> currentReturnAllocatedSlotConsumer = this.returnAllocatedSlotConsumer;
+
+		if (currentReturnAllocatedSlotConsumer != null) {
+			currentReturnAllocatedSlotConsumer.accept(slot);
+		}
+
+		return CompletableFuture.completedFuture(true);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 2de6f9e..2d8d02d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import java.net.InetAddress
 import java.util
 import java.util.concurrent._
 import java.util.{Collections, UUID}
@@ -39,9 +38,7 @@ import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
 import org.apache.flink.runtime.jobmaster.JobMaster
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
 import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager}
-import org.apache.flink.runtime.metrics.{MetricRegistryImpl, MetricRegistryConfiguration}
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
-import org.apache.flink.runtime.taskexecutor.{TaskManagerServices, TaskManagerServicesConfiguration}
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl}
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.util.LeaderRetrievalUtils