You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/10 14:21:44 UTC

[5/9] flink git commit: [FLINK-9324] Wait for slot release before completing release future in SingleLogicalSlot

[FLINK-9324] Wait for slot release before completing release future in SingleLogicalSlot

This commit properly waits for the completion of the SingleLogicalSlot's release future
until the SlotOwner has acknowledged the release. That way the ExecutionGraph will only
recover after all of its slots have been returned to the SlotPool.

As a side effect, the changes in this commit should reduce the number of redundant release
calls sent to the SlotOwner which cluttered the debug logs.

This closes #5980.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7eb6aca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7eb6aca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7eb6aca

Branch: refs/heads/master
Commit: c7eb6acaf95a6656ec6bd0a0b401c1944473e7f2
Parents: 3c86b6b
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 9 15:29:36 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 10 16:18:31 2018 +0200

----------------------------------------------------------------------
 .../jobmanager/slots/SlotAndLocality.java       |  60 ----
 .../jobmaster/slotpool/AllocatedSlot.java       |  22 +-
 .../jobmaster/slotpool/SingleLogicalSlot.java   |  89 +++--
 .../jobmaster/slotpool/SlotAndLocality.java     |  59 ++++
 .../runtime/jobmaster/slotpool/SlotPool.java    |   7 +-
 .../jobmaster/slotpool/SlotSharingManager.java  |  93 +++---
 .../jobmaster/slotpool/AvailableSlotsTest.java  |   1 -
 .../jobmaster/slotpool/DummyPayload.java        |  50 +++
 .../slotpool/SingleLogicalSlotTest.java         | 323 +++++++++++++++++++
 .../jobmaster/slotpool/SlotPoolTest.java        |  31 +-
 .../slotpool/SlotSharingManagerTest.java        |   6 +-
 11 files changed, 556 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c7eb6aca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
deleted file mode 100644
index fed26c9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.slots;
-
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot;
-
-import javax.annotation.Nonnull;
-
-/**
- * A combination of a {@link AllocatedSlot} and a {@link Locality}.
- */
-public class SlotAndLocality {
-
-	@Nonnull
-	private final AllocatedSlot slot;
-
-	@Nonnull
-	private final Locality locality;
-
-	public SlotAndLocality(@Nonnull AllocatedSlot slot, @Nonnull Locality locality) {
-		this.slot = slot;
-		this.locality = locality;
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Nonnull
-	public AllocatedSlot getSlot() {
-		return slot;
-	}
-
-	@Nonnull
-	public Locality getLocality() {
-		return locality;
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public String toString() {
-		return "Slot: " + slot + " (" + locality + ')';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c7eb6aca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
index d3b2ab2..75195cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
@@ -42,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the
  * JobManager. All slots had a default unknown resource profile. 
  */
-public class AllocatedSlot implements SlotContext {
+class AllocatedSlot implements SlotContext {
 
 	/** The ID under which the slot is allocated. Uniquely identifies the slot. */
 	private final AllocationID allocationId;
@@ -171,21 +171,13 @@ public class AllocatedSlot implements SlotContext {
 	 * then it is removed from the slot.
 	 *
 	 * @param cause of the release operation
-	 * @return true if the payload could be released and was removed from the slot, otherwise false
 	 */
-	public boolean releasePayload(Throwable cause) {
+	public void releasePayload(Throwable cause) {
 		final Payload payload = payloadReference.get();
 
 		if (payload != null) {
-			if (payload.release(cause)) {
-				payloadReference.set(null);
-
-				return true;
-			} else {
-				return false;
-			}
-		} else {
-			return true;
+			payload.release(cause);
+			payloadReference.set(null);
 		}
 	}
 
@@ -222,12 +214,10 @@ public class AllocatedSlot implements SlotContext {
 	interface Payload {
 
 		/**
-		 * Releases the payload. If the payload could be released, then it returns true,
-		 * otherwise false.
+		 * Releases the payload
 		 *
 		 * @param cause of the payload release
-		 * @return true if the payload could be released, otherwise false
 		 */
-		boolean release(Throwable cause);
+		void release(Throwable cause);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c7eb6aca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
index 0736b56..9ee4db8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Function;
 
 /**
  * Implementation of the {@link LogicalSlot} which is used by the {@link SlotPool}.
@@ -44,6 +45,11 @@ public class SingleLogicalSlot implements LogicalSlot, AllocatedSlot.Payload {
 		Payload.class,
 		"payload");
 
+	private static final AtomicReferenceFieldUpdater<SingleLogicalSlot, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+		SingleLogicalSlot.class,
+		State.class,
+		"state");
+
 	private final SlotRequestId slotRequestId;
 
 	private final SlotContext slotContext;
@@ -58,6 +64,10 @@ public class SingleLogicalSlot implements LogicalSlot, AllocatedSlot.Payload {
 	// owner of this slot to which it is returned upon release
 	private final SlotOwner slotOwner;
 
+	private final CompletableFuture<Void> releaseFuture;
+
+	private volatile State state;
+
 	// LogicalSlot.Payload of this slot
 	private volatile Payload payload;
 
@@ -72,8 +82,10 @@ public class SingleLogicalSlot implements LogicalSlot, AllocatedSlot.Payload {
 		this.slotSharingGroupId = slotSharingGroupId;
 		this.locality = Preconditions.checkNotNull(locality);
 		this.slotOwner = Preconditions.checkNotNull(slotOwner);
+		this.releaseFuture = new CompletableFuture<>();
 
-		payload = null;
+		this.state = State.ALIVE;
+		this.payload = null;
 	}
 
 	@Override
@@ -93,20 +105,11 @@ public class SingleLogicalSlot implements LogicalSlot, AllocatedSlot.Payload {
 
 	@Override
 	public boolean isAlive() {
-		final Payload currentPayload = payload;
-
-		if (currentPayload != null) {
-			return !currentPayload.getTerminalStateFuture().isDone();
-		} else {
-			// We are always alive if there is no payload assigned yet.
-			// If this slot is released and no payload is assigned, then the TERMINATED_PAYLOAD is assigned
-			return true;
-		}
+		return state == State.ALIVE;
 	}
 
 	@Override
 	public boolean tryAssignPayload(Payload payload) {
-		Preconditions.checkNotNull(payload);
 		return PAYLOAD_UPDATER.compareAndSet(this, null, payload);
 	}
 
@@ -118,15 +121,12 @@ public class SingleLogicalSlot implements LogicalSlot, AllocatedSlot.Payload {
 
 	@Override
 	public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
-		// set an already terminated payload if the payload of this slot is still empty
-		tryAssignPayload(TERMINATED_PAYLOAD);
-
-		// notify the payload that the slot will be released
-		payload.fail(cause);
+		if (STATE_UPDATER.compareAndSet(this, State.ALIVE, State.RELEASING)) {
+			final CompletableFuture<?> payloadTerminalStateFuture = signalPayloadRelease(cause);
+			returnSlotToOwner(payloadTerminalStateFuture);
+		}
 
-		// Wait until the payload has been terminated. Only then, we return the slot to its rightful owner
-		return payload.getTerminalStateFuture()
-			.whenComplete((Object ignored, Throwable throwable) -> slotOwner.returnAllocatedSlot(this));
+		return releaseFuture;
 	}
 
 	@Override
@@ -159,10 +159,55 @@ public class SingleLogicalSlot implements LogicalSlot, AllocatedSlot.Payload {
 	 * the logical slot.
 	 *
 	 * @param cause of the payload release
-	 * @return true if the logical slot's payload could be released, otherwise false
 	 */
 	@Override
-	public boolean release(Throwable cause) {
-		return releaseSlot(cause).isDone();
+	public void release(Throwable cause) {
+		if (STATE_UPDATER.compareAndSet(this, State.ALIVE, State.RELEASING)) {
+			signalPayloadRelease(cause);
+		}
+		markReleased();
+		releaseFuture.complete(null);
+	}
+
+	private CompletableFuture<?> signalPayloadRelease(Throwable cause) {
+		tryAssignPayload(TERMINATED_PAYLOAD);
+		payload.fail(cause);
+
+		return payload.getTerminalStateFuture();
+	}
+
+	private void returnSlotToOwner(CompletableFuture<?> terminalStateFuture) {
+		final CompletableFuture<Boolean> slotReturnFuture = terminalStateFuture.handle((Object ignored, Throwable throwable) -> {
+			if (state == State.RELEASING) {
+				return slotOwner.returnAllocatedSlot(this);
+			} else {
+				return CompletableFuture.completedFuture(true);
+			}
+		}).thenCompose(Function.identity());
+
+		slotReturnFuture.whenComplete(
+			(Object ignored, Throwable throwable) -> {
+				markReleased();
+
+				if (throwable != null) {
+					releaseFuture.completeExceptionally(throwable);
+				} else {
+					releaseFuture.complete(null);
+				}
+			});
+	}
+
+	private void markReleased() {
+		state = State.RELEASED;
+	}
+
+	// -------------------------------------------------------------------------
+	// Internal classes
+	// -------------------------------------------------------------------------
+
+	enum State {
+		ALIVE,
+		RELEASING,
+		RELEASED
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c7eb6aca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotAndLocality.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotAndLocality.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotAndLocality.java
new file mode 100644
index 0000000..4befb51
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotAndLocality.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A combination of a {@link AllocatedSlot} and a {@link Locality}.
+ */
+public class SlotAndLocality {
+
+	@Nonnull
+	private final AllocatedSlot slot;
+
+	@Nonnull
+	private final Locality locality;
+
+	public SlotAndLocality(@Nonnull AllocatedSlot slot, @Nonnull Locality locality) {
+		this.slot = slot;
+		this.locality = locality;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Nonnull
+	public AllocatedSlot getSlot() {
+		return slot;
+	}
+
+	@Nonnull
+	public Locality getLocality() {
+		return locality;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "Slot: " + slot + " (" + locality + ')';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c7eb6aca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 2b78656..f53104d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
@@ -764,10 +763,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 				final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId);
 
 				if (allocatedSlot != null) {
-					// sanity check
-					if (allocatedSlot.releasePayload(cause)) {
-						tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
-					}
+					allocatedSlot.releasePayload(cause);
+					tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
 				} else {
 					log.debug("There is no allocated slot with slot request id {}. Ignoring the release slot request.", slotRequestId);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/c7eb6aca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index 242d645..8fcf813 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -44,6 +44,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
 /**
  * Manager which is responsible for slot sharing. Slot sharing allows to run different
@@ -281,9 +282,8 @@ public class SlotSharingManager {
 		 * Release the task slot.
 		 *
 		 * @param cause for the release
-		 * @return true if the slot could be released, otherwise false
 		 */
-		public abstract boolean release(Throwable cause);
+		public abstract void release(Throwable cause);
 	}
 
 	/**
@@ -433,60 +433,51 @@ public class SlotSharingManager {
 		}
 
 		@Override
-		public boolean release(Throwable cause) {
+		public void release(Throwable cause) {
 			releasingChildren = true;
 
 			// first release all children and remove them if they could be released immediately
-			children.values().removeIf(node -> {
-				boolean release = node.release(cause);
+			for (TaskSlot taskSlot : children.values()) {
+				taskSlot.release(cause);
+				allTaskSlots.remove(taskSlot.getSlotRequestId());
+			}
 
-				if (release) {
-					allTaskSlots.remove(node.getSlotRequestId());
-				}
-
-				return release;
-			});
+			children.clear();
 
 			releasingChildren = false;
 
-			if (children.isEmpty()) {
-				if (parent != null) {
-					// we remove ourselves from our parent if we no longer have children
-					parent.releaseChild(getGroupId());
-				} else if (allTaskSlots.remove(getSlotRequestId()) != null) {
-					// we are the root node --> remove the root node from the list of task slots
+			if (parent != null) {
+				// we remove ourselves from our parent if we no longer have children
+				parent.releaseChild(getGroupId());
+			} else if (allTaskSlots.remove(getSlotRequestId()) != null) {
+				// we are the root node --> remove the root node from the list of task slots
 
-					if (!slotContextFuture.isDone() || slotContextFuture.isCompletedExceptionally()) {
-						synchronized (lock) {
-							// the root node should still be unresolved
-							unresolvedRootSlots.remove(getSlotRequestId());
-						}
-					} else {
-						// the root node should be resolved --> we can access the slot context
-						final SlotContext slotContext = slotContextFuture.getNow(null);
+				if (!slotContextFuture.isDone() || slotContextFuture.isCompletedExceptionally()) {
+					synchronized (lock) {
+						// the root node should still be unresolved
+						unresolvedRootSlots.remove(getSlotRequestId());
+					}
+				} else {
+					// the root node should be resolved --> we can access the slot context
+					final SlotContext slotContext = slotContextFuture.getNow(null);
 
-						if (slotContext != null) {
-							synchronized (lock) {
-								final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
+					if (slotContext != null) {
+						synchronized (lock) {
+							final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
 
-								if (multiTaskSlots != null) {
-									multiTaskSlots.remove(this);
+							if (multiTaskSlots != null) {
+								multiTaskSlots.remove(this);
 
-									if (multiTaskSlots.isEmpty()) {
-										resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
-									}
+								if (multiTaskSlots.isEmpty()) {
+									resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
 								}
 							}
 						}
 					}
-
-					// release the underlying allocated slot
-					allocatedSlotActions.releaseSlot(allocatedSlotRequestId, null, cause);
 				}
 
-				return true;
-			} else {
-				return false;
+				// release the underlying allocated slot
+				allocatedSlotActions.releaseSlot(allocatedSlotRequestId, null, cause);
 			}
 		}
 
@@ -518,7 +509,7 @@ public class SlotSharingManager {
 		private final MultiTaskSlot parent;
 
 		// future containing a LogicalSlot which is completed once the underlying SlotContext future is completed
-		private final CompletableFuture<LogicalSlot> logicalSlotFuture;
+		private final CompletableFuture<SingleLogicalSlot> singleLogicalSlotFuture;
 
 		private SingleTaskSlot(
 				SlotRequestId slotRequestId,
@@ -530,7 +521,7 @@ public class SlotSharingManager {
 			this.parent = Preconditions.checkNotNull(parent);
 
 			Preconditions.checkNotNull(locality);
-			logicalSlotFuture = parent.getSlotContextFuture()
+			singleLogicalSlotFuture = parent.getSlotContextFuture()
 				.thenApply(
 					(SlotContext slotContext) ->
 						new SingleLogicalSlot(
@@ -542,29 +533,23 @@ public class SlotSharingManager {
 		}
 
 		CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
-			return logicalSlotFuture;
+			return singleLogicalSlotFuture.thenApply(Function.identity());
 		}
 
 		@Override
-		public boolean release(Throwable cause) {
-			logicalSlotFuture.completeExceptionally(cause);
+		public void release(Throwable cause) {
+			singleLogicalSlotFuture.completeExceptionally(cause);
 
 			boolean pendingLogicalSlotRelease = false;
 
-			if (logicalSlotFuture.isDone() && !logicalSlotFuture.isCompletedExceptionally()) {
+			if (singleLogicalSlotFuture.isDone() && !singleLogicalSlotFuture.isCompletedExceptionally()) {
 				// we have a single task slot which we first have to release
-				final LogicalSlot logicalSlot = logicalSlotFuture.getNow(null);
+				final SingleLogicalSlot singleLogicalSlot = singleLogicalSlotFuture.getNow(null);
 
-				if ((logicalSlot != null) && (logicalSlot.isAlive())) {
-					pendingLogicalSlotRelease = logicalSlot.releaseSlot(cause).isDone();
-				}
-			}
-
-			if (!pendingLogicalSlotRelease) {
-				parent.releaseChild(getGroupId());
+				singleLogicalSlot.release(cause);
 			}
 
-			return !pendingLogicalSlotRelease;
+			parent.releaseChild(getGroupId());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c7eb6aca/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
index ed090d8..4e9ca88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
-import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.testutils.category.New;

http://git-wip-us.apache.org/repos/asf/flink/blob/c7eb6aca/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DummyPayload.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DummyPayload.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DummyPayload.java
new file mode 100644
index 0000000..b88c292
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DummyPayload.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link LogicalSlot.Payload} implementation for test purposes.
+ */
+final class DummyPayload implements LogicalSlot.Payload {
+
+    private final CompletableFuture<?> terminalStateFuture;
+
+    DummyPayload() {
+    	this(new CompletableFuture<>());
+	}
+
+    DummyPayload(CompletableFuture<?> terminalStateFuture) {
+        this.terminalStateFuture = Preconditions.checkNotNull(terminalStateFuture);
+    }
+
+    @Override
+    public void fail(Throwable cause) {
+        terminalStateFuture.complete(null);
+    }
+
+    @Override
+    public CompletableFuture<?> getTerminalStateFuture() {
+        return terminalStateFuture;
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c7eb6aca/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
new file mode 100644
index 0000000..c5beda4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
+
+/**
+ * Tests for the {@link SingleLogicalSlot} class.
+ */
+public class SingleLogicalSlotTest extends TestLogger {
+
+	@Test
+	public void testPayloadAssignment() {
+		final SingleLogicalSlot singleLogicalSlot = createSingleLogicalSlot();
+		final DummyPayload dummyPayload1 = new DummyPayload();
+		final DummyPayload dummyPayload2 = new DummyPayload();
+
+		assertThat(singleLogicalSlot.tryAssignPayload(dummyPayload1), is(true));
+		assertThat(singleLogicalSlot.tryAssignPayload(dummyPayload2), is(false));
+
+		assertThat(singleLogicalSlot.getPayload(), sameInstance(dummyPayload1));
+	}
+
+	private SingleLogicalSlot createSingleLogicalSlot() {
+		return createSingleLogicalSlot(new DummySlotOwner());
+	}
+
+	private SingleLogicalSlot createSingleLogicalSlot(SlotOwner slotOwner) {
+		return new SingleLogicalSlot(
+			new SlotRequestId(),
+			new DummySlotContext(),
+			null,
+			Locality.LOCAL,
+			slotOwner);
+	}
+
+	@Test
+	public void testAlive() throws Exception {
+		final SingleLogicalSlot singleLogicalSlot = createSingleLogicalSlot();
+		final DummyPayload dummyPayload = new DummyPayload();
+
+		assertThat(singleLogicalSlot.isAlive(), is(true));
+
+		assertThat(singleLogicalSlot.tryAssignPayload(dummyPayload), is(true));
+		assertThat(singleLogicalSlot.isAlive(), is(true));
+
+		final CompletableFuture<?> releaseFuture = singleLogicalSlot.releaseSlot(new FlinkException("Test exception"));
+
+		assertThat(singleLogicalSlot.isAlive(), is(false));
+
+		releaseFuture.get();
+
+		assertThat(singleLogicalSlot.isAlive(), is(false));
+	}
+
+	@Test
+	public void testPayloadAssignmentAfterRelease() {
+		final SingleLogicalSlot singleLogicalSlot = createSingleLogicalSlot();
+		final DummyPayload dummyPayload = new DummyPayload();
+
+		singleLogicalSlot.releaseSlot(new FlinkException("Test exception"));
+
+		assertThat(singleLogicalSlot.tryAssignPayload(dummyPayload), is(false));
+	}
+
+	/**
+	 * Tests that the {@link AllocatedSlot.Payload#release(Throwable)} does not wait
+	 * for the payload to reach a terminal state.
+	 */
+	@Test
+	public void testAllocatedSlotRelease() {
+		final CompletableFuture<LogicalSlot> returnSlotFuture = new CompletableFuture<>();
+		final WaitingSlotOwner waitingSlotOwner = new WaitingSlotOwner(returnSlotFuture, new CompletableFuture<>());
+		final SingleLogicalSlot singleLogicalSlot = createSingleLogicalSlot(waitingSlotOwner);
+
+		final CompletableFuture<?> terminalStateFuture = new CompletableFuture<>();
+		final CompletableFuture<?> failFuture = new CompletableFuture<>();
+		final ManualTestingPayload dummyPayload = new ManualTestingPayload(failFuture, terminalStateFuture);
+
+		assertThat(singleLogicalSlot.tryAssignPayload(dummyPayload), is(true));
+
+		singleLogicalSlot.release(new FlinkException("Test exception"));
+
+		assertThat(failFuture.isDone(), is(true));
+		// we don't require the logical slot to return to the owner because
+		// the release call should only come from the owner
+		assertThat(returnSlotFuture.isDone(), is(false));
+	}
+
+	/**
+	 * Tests that the slot release is only signaled after the owner has
+	 * taken it back.
+	 */
+	@Test
+	public void testSlotRelease() {
+		final CompletableFuture<LogicalSlot> returnedSlotFuture = new CompletableFuture<>();
+		final CompletableFuture<Boolean> returnSlotResponseFuture = new CompletableFuture<>();
+		final WaitingSlotOwner waitingSlotOwner = new WaitingSlotOwner(returnedSlotFuture, returnSlotResponseFuture);
+		final CompletableFuture<?> terminalStateFuture = new CompletableFuture<>();
+		final CompletableFuture<?> failFuture = new CompletableFuture<>();
+		final ManualTestingPayload dummyPayload = new ManualTestingPayload(failFuture, terminalStateFuture);
+
+		final SingleLogicalSlot singleLogicalSlot = createSingleLogicalSlot(waitingSlotOwner);
+
+		assertThat(singleLogicalSlot.tryAssignPayload(dummyPayload), is(true));
+
+		final CompletableFuture<?> releaseFuture = singleLogicalSlot.releaseSlot(new FlinkException("Test exception"));
+
+		assertThat(releaseFuture.isDone(), is(false));
+		assertThat(returnedSlotFuture.isDone(), is(false));
+		assertThat(failFuture.isDone(), is(true));
+
+		terminalStateFuture.complete(null);
+
+		assertThat(returnedSlotFuture.isDone(), is(true));
+
+		returnSlotResponseFuture.complete(true);
+
+		assertThat(releaseFuture.isDone(), is(true));
+	}
+
+	/**
+	 * Tests that concurrent release operations only trigger the failing of the payload and
+	 * the return of the slot once.
+	 */
+	@Test
+	public void testConcurrentReleaseOperations() throws Exception {
+		final CountingSlotOwner countingSlotOwner = new CountingSlotOwner();
+		final CountingFailPayload countingFailPayload = new CountingFailPayload();
+		final SingleLogicalSlot singleLogicalSlot = createSingleLogicalSlot(countingSlotOwner);
+
+		singleLogicalSlot.tryAssignPayload(countingFailPayload);
+
+		final ExecutorService executorService = Executors.newFixedThreadPool(4);
+
+		try {
+			final int numberConcurrentOperations = 10;
+			final Collection<CompletableFuture<?>> releaseOperationFutures = new ArrayList<>(numberConcurrentOperations);
+
+			for (int i = 0; i < numberConcurrentOperations; i++) {
+				final CompletableFuture<Void> releaseOperationFuture = CompletableFuture.runAsync(
+					() -> {
+						try {
+							singleLogicalSlot.releaseSlot(new FlinkException("Test exception")).get();
+						} catch (InterruptedException | ExecutionException e) {
+							ExceptionUtils.checkInterrupted(e);
+							throw new CompletionException(e);
+						}
+					});
+
+				releaseOperationFutures.add(releaseOperationFuture);
+			}
+
+			final FutureUtils.ConjunctFuture<Void> releaseOperationsFuture = FutureUtils.waitForAll(releaseOperationFutures);
+
+			releaseOperationsFuture.get();
+
+			assertThat(countingSlotOwner.getReleaseCount(), is(1));
+			assertThat(countingFailPayload.getFailCount(), is(1));
+		} finally {
+			executorService.shutdownNow();
+		}
+	}
+
+	private static final class CountingFailPayload implements LogicalSlot.Payload {
+
+		private final AtomicInteger failCounter = new AtomicInteger(0);
+
+		int getFailCount() {
+			return failCounter.get();
+		}
+
+		@Override
+		public void fail(Throwable cause) {
+			failCounter.incrementAndGet();
+		}
+
+		@Override
+		public CompletableFuture<?> getTerminalStateFuture() {
+			return CompletableFuture.completedFuture(null);
+		}
+	}
+
+	private static final class CountingSlotOwner implements SlotOwner {
+
+		private final AtomicInteger counter;
+
+		private CountingSlotOwner() {
+			this.counter = new AtomicInteger(0);
+		}
+
+		public int getReleaseCount() {
+			return counter.get();
+		}
+
+		@Override
+		public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
+			counter.incrementAndGet();
+			return CompletableFuture.completedFuture(true);
+		}
+	}
+
+	private static final class ManualTestingPayload implements LogicalSlot.Payload {
+
+		private final CompletableFuture<?> failFuture;
+
+		private final CompletableFuture<?> terminalStateFuture;
+
+		private ManualTestingPayload(CompletableFuture<?> failFuture, CompletableFuture<?> terminalStateFuture) {
+			this.failFuture = failFuture;
+			this.terminalStateFuture = terminalStateFuture;
+		}
+
+		@Override
+		public void fail(Throwable cause) {
+			failFuture.completeExceptionally(cause);
+		}
+
+		@Override
+		public CompletableFuture<?> getTerminalStateFuture() {
+			return terminalStateFuture;
+		}
+	}
+
+	private static final class WaitingSlotOwner implements SlotOwner {
+
+		private final CompletableFuture<LogicalSlot> returnAllocatedSlotFuture;
+
+		private final CompletableFuture<Boolean> returnAllocatedSlotResponse;
+
+		private WaitingSlotOwner(CompletableFuture<LogicalSlot> returnAllocatedSlotFuture, CompletableFuture<Boolean> returnAllocatedSlotResponse) {
+			this.returnAllocatedSlotFuture = Preconditions.checkNotNull(returnAllocatedSlotFuture);
+			this.returnAllocatedSlotResponse = Preconditions.checkNotNull(returnAllocatedSlotResponse);
+		}
+
+		@Override
+		public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
+			returnAllocatedSlotFuture.complete(logicalSlot);
+			return returnAllocatedSlotResponse;
+		}
+	}
+
+	private static final class DummySlotContext implements SlotContext {
+
+		private final AllocationID allocationId;
+
+		private final TaskManagerLocation taskManagerLocation;
+
+		private final TaskManagerGateway taskManagerGateway;
+
+		DummySlotContext() {
+			allocationId = new AllocationID();
+			taskManagerLocation = new LocalTaskManagerLocation();
+			taskManagerGateway = new SimpleAckingTaskManagerGateway();
+		}
+
+		@Override
+		public AllocationID getAllocationId() {
+			return allocationId;
+		}
+
+		@Override
+		public TaskManagerLocation getTaskManagerLocation() {
+			return taskManagerLocation;
+		}
+
+		@Override
+		public int getPhysicalSlotNumber() {
+			return 0;
+		}
+
+		@Override
+		public TaskManagerGateway getTaskManagerGateway() {
+			return taskManagerGateway;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c7eb6aca/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index eae5238..a20332b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -58,8 +57,6 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -346,24 +343,7 @@ public class SlotPoolTest extends TestLogger {
 
 		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
 
-		final CompletableFuture<Boolean> slotReturnFuture = new CompletableFuture<>();
-
-		final SlotPool slotPool = new SlotPool(rpcService, jobId) {
-			@Override
-			public CompletableFuture<Acknowledge> releaseSlot(
-					SlotRequestId slotRequestId,
-					@Nullable SlotSharingGroupId slotSharingGroupId,
-					@Nullable Throwable cause) {
-				super.releaseSlot(
-					slotRequestId,
-					slotSharingGroupId,
-					cause);
-
-				slotReturnFuture.complete(true);
-
-				return CompletableFuture.completedFuture(Acknowledge.get());
-			}
-		};
+		final SlotPool slotPool = new SlotPool(rpcService, jobId);
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
@@ -396,11 +376,14 @@ public class SlotPoolTest extends TestLogger {
 			assertTrue(future1.isDone());
 			assertFalse(future2.isDone());
 
-			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID());
+			final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
+			final DummyPayload dummyPayload = new DummyPayload(releaseFuture);
 
-			// wait until the slot has been returned
-			slotReturnFuture.get();
+			slot1.tryAssignPayload(dummyPayload);
+
+			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID());
 
+			releaseFuture.get();
 			assertFalse(slot1.isAlive());
 
 			// slot released and not usable, second allocation still not fulfilled

http://git-wip-us.apache.org/repos/asf/flink/blob/c7eb6aca/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
index 4c56c63..9297f59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
@@ -105,7 +105,7 @@ public class SlotSharingManagerTest extends TestLogger {
 
 		assertTrue(slotSharingManager.contains(slotRequestId));
 
-		assertTrue(rootSlot.release(new FlinkException("Test exception")));
+		rootSlot.release(new FlinkException("Test exception"));
 
 		// check that we return the allocated slot
 		assertEquals(allocatedSlotRequestId, slotReleasedFuture.get());
@@ -194,7 +194,7 @@ public class SlotSharingManagerTest extends TestLogger {
 		assertFalse(singleTaskSlotFuture.isDone());
 
 		FlinkException testException = new FlinkException("Test exception");
-		assertTrue(singleTaskSlot.release(testException));
+		singleTaskSlot.release(testException);
 
 		// check that we fail the single task slot future
 		assertTrue(singleTaskSlotFuture.isCompletedExceptionally());
@@ -203,7 +203,7 @@ public class SlotSharingManagerTest extends TestLogger {
 		// the root slot has still one child
 		assertTrue(slotSharingManager.contains(rootSlotRequestId));
 
-		assertTrue(multiTaskSlot.release(testException));
+		multiTaskSlot.release(testException);
 
 		assertEquals(allocatedSlotRequestId, releasedSlotFuture.get());
 		assertFalse(slotSharingManager.contains(rootSlotRequestId));