You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 16:42:26 UTC

[09/11] flink git commit: [FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to SlotPool

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java
new file mode 100644
index 0000000..4511bf6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A logical slot represents a resource on a TaskManager into
+ * which a single task can be deployed.
+ */
+public interface LogicalSlot {
+
+	Payload TERMINATED_PAYLOAD = new Payload() {
+
+		private final CompletableFuture<?> completedTerminationFuture = CompletableFuture.completedFuture(null);
+		@Override
+		public void fail(Throwable cause) {
+			// ignore
+		}
+
+		@Override
+		public CompletableFuture<?> getTerminalStateFuture() {
+			return completedTerminationFuture;
+		}
+	};
+
+	/**
+	 * Return the TaskManager location of this slot.
+	 *
+	 * @return TaskManager location of this slot
+	 */
+	TaskManagerLocation getTaskManagerLocation();
+
+	/**
+	 * Return the TaskManager gateway to talk to the TaskManager.
+	 *
+	 * @return TaskManager gateway to talk to the TaskManager
+	 */
+	TaskManagerGateway getTaskManagerGateway();
+
+	/**
+	 * Gets the locality of this slot.
+	 *
+	 * @return locality of this slot
+	 */
+	Locality getLocality();
+
+	/**
+	 * True if the slot is alive and has not been released.
+	 *
+	 * @return True if the slot is alive, otherwise false if the slot is released
+	 */
+	boolean isAlive();
+
+	/**
+	 * Tries to assign a payload to this slot. One can only assign a single
+	 * payload once.
+	 *
+	 * @param payload to be assigned to this slot.
+	 * @return true if the payload could be assigned, otherwise false
+	 */
+	boolean tryAssignPayload(Payload payload);
+
+	/**
+	 * Returns the set payload or null if none.
+	 *
+	 * @return Payload of this slot of null if none
+	 */
+	@Nullable
+	Payload getPayload();
+
+	/**
+	 * Releases this slot.
+	 *
+	 * @return Future which is completed once the slot has been released,
+	 * 		in case of a failure it is completed exceptionally
+	 * @deprecated Added because extended the actual releaseSlot method with cause parameter.
+	 */
+	default CompletableFuture<?> releaseSlot() {
+		return releaseSlot(null);
+	}
+
+	/**
+	 * Releases this slot.
+	 *
+	 * @param cause why the slot was released or null if none
+	 * @return future which is completed once the slot has been released
+	 */
+	CompletableFuture<?> releaseSlot(@Nullable Throwable cause);
+
+	/**
+	 * Gets the slot number on the TaskManager.
+	 *
+	 * @return slot number
+	 */
+	int getPhysicalSlotNumber();
+
+	/**
+	 * Gets the allocation id of this slot.
+	 *
+	 * @return allocation id of this slot
+	 */
+	AllocationID getAllocationId();
+
+	/**
+	 * Gets the slot request id uniquely identifying the request with which this
+	 * slot has been allocated.
+	 *
+	 * @return Unique id identifying the slot request with which this slot was allocated
+	 */
+	SlotRequestId getSlotRequestId();
+
+	/**
+	 * Gets the slot sharing group id to which this slot belongs.
+	 *
+	 * @return slot sharing group id of this slot or null, if none.
+	 */
+	@Nullable
+	SlotSharingGroupId getSlotSharingGroupId();
+
+	/**
+	 * Payload for a logical slot.
+	 */
+	interface Payload {
+
+		/**
+		 * Fail the payload with the given cause.
+		 *
+		 * @param cause of the failure
+		 */
+		void fail(Throwable cause);
+
+		/**
+		 * Gets the terminal state future which is completed once the payload
+		 * has reached a terminal state.
+		 *
+		 * @return Terminal state future
+		 */
+		CompletableFuture<?> getTerminalStateFuture();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
new file mode 100644
index 0000000..65bf2a1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Interface for the context of a {@link LogicalSlot}. This context contains information
+ * about the underlying allocated slot and how to communicate with the TaskManager on which
+ * it was allocated.
+ */
+public interface SlotContext {
+	/**
+	 * Gets the id under which the slot has been allocated on the TaskManager. This id uniquely identifies the
+	 * physical slot.
+	 *
+	 * @return The id under whic teh slot has been allocated on the TaskManager
+	 */
+	AllocationID getAllocationId();
+
+	/**
+	 * Gets the location info of the TaskManager that offers this slot.
+	 *
+	 * @return The location info of the TaskManager that offers this slot
+	 */
+	TaskManagerLocation getTaskManagerLocation();
+
+	/**
+	 * Gets the number of the slot.
+	 *
+	 * @return The number of the slot on the TaskManager.
+	 */
+	int getPhysicalSlotNumber();
+
+	/**
+	 * Gets the actor gateway that can be used to send messages to the TaskManager.
+	 * <p>
+	 * This method should be removed once the new interface-based RPC abstraction is in place
+	 *
+	 * @return The gateway that can be used to send messages to the TaskManager.
+	 */
+	TaskManagerGateway getTaskManagerGateway();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotOwner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotOwner.java
new file mode 100644
index 0000000..9cc6f81
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotOwner.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface for components that hold slots and to which slots get released / recycled.
+ */
+public interface SlotOwner {
+
+	/**
+	 * Return the given slot to the slot owner.
+	 *
+	 * @param logicalSlot to return
+	 * @return Future which is completed with true if the slot could be returned, otherwise with false
+	 */
+	CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
new file mode 100644
index 0000000..d3fa775
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.util.AbstractID;
+
+/**
+ * Request id identifying slot requests made by the {@link SlotProvider} towards the
+ * {@link SlotPool}.
+ */
+public final class SlotRequestId extends AbstractID {
+    private static final long serialVersionUID = -6072105912250154283L;
+
+    public SlotRequestId(long lowerPart, long upperPart) {
+        super(lowerPart, upperPart);
+    }
+
+    public SlotRequestId() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
new file mode 100644
index 0000000..a560ebc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@code AllocatedSlot} represents a slot that the JobMaster allocated from a TaskExecutor.
+ * It represents a slice of allocated resources from the TaskExecutor.
+ * 
+ * <p>To allocate an {@code AllocatedSlot}, the requests a slot from the ResourceManager. The
+ * ResourceManager picks (or starts) a TaskExecutor that will then allocate the slot to the
+ * JobMaster and notify the JobMaster.
+ * 
+ * <p>Note: Prior to the resource management changes introduced in (Flink Improvement Proposal 6),
+ * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the
+ * JobManager. All slots had a default unknown resource profile. 
+ */
+public class AllocatedSlot implements SlotContext {
+
+	/** The ID under which the slot is allocated. Uniquely identifies the slot. */
+	private final AllocationID allocationId;
+
+	/** The location information of the TaskManager to which this slot belongs */
+	private final TaskManagerLocation taskManagerLocation;
+
+	/** The resource profile of the slot provides */
+	private final ResourceProfile resourceProfile;
+
+	/** RPC gateway to call the TaskManager that holds this slot */
+	private final TaskManagerGateway taskManagerGateway;
+
+	/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
+	private final int physicalSlotNumber;
+
+	private final AtomicReference<Payload> payloadReference;
+
+	// ------------------------------------------------------------------------
+
+	public AllocatedSlot(
+			AllocationID allocationId,
+			TaskManagerLocation location,
+			int physicalSlotNumber,
+			ResourceProfile resourceProfile,
+			TaskManagerGateway taskManagerGateway) {
+		this.allocationId = checkNotNull(allocationId);
+		this.taskManagerLocation = checkNotNull(location);
+		this.physicalSlotNumber = physicalSlotNumber;
+		this.resourceProfile = checkNotNull(resourceProfile);
+		this.taskManagerGateway = checkNotNull(taskManagerGateway);
+
+		payloadReference = new AtomicReference<>(null);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the ID under which the slot is allocated, which uniquely identifies the slot.
+	 * 
+	 * @return The ID under which the slot is allocated
+	 */
+	public AllocationID getAllocationId() {
+		return allocationId;
+	}
+
+	/**
+	 * Gets the ID of the TaskManager on which this slot was allocated.
+	 * 
+	 * <p>This is equivalent to {@link #getTaskManagerLocation()#getTaskManagerId()}.
+	 * 
+	 * @return This slot's TaskManager's ID.
+	 */
+	public ResourceID getTaskManagerId() {
+		return getTaskManagerLocation().getResourceID();
+	}
+
+	/**
+	 * Gets the resource profile of the slot.
+	 *
+	 * @return The resource profile of the slot.
+	 */
+	public ResourceProfile getResourceProfile() {
+		return resourceProfile;
+	}
+
+	/**
+	 * Gets the location info of the TaskManager that offers this slot.
+	 *
+	 * @return The location info of the TaskManager that offers this slot
+	 */
+	public TaskManagerLocation getTaskManagerLocation() {
+		return taskManagerLocation;
+	}
+
+	/**
+	 * Gets the actor gateway that can be used to send messages to the TaskManager.
+	 * <p>
+	 * This method should be removed once the new interface-based RPC abstraction is in place
+	 *
+	 * @return The actor gateway that can be used to send messages to the TaskManager.
+	 */
+	public TaskManagerGateway getTaskManagerGateway() {
+		return taskManagerGateway;
+	}
+
+	/**
+	 * Returns the physical slot number of the allocated slot. The physical slot number corresponds
+	 * to the slot index on the TaskExecutor.
+	 *
+	 * @return Physical slot number of the allocated slot
+	 */
+	public int getPhysicalSlotNumber() {
+		return physicalSlotNumber;
+	}
+
+	/**
+	 * Returns true if this slot is not being used (e.g. a logical slot is allocated from this slot).
+	 *
+	 * @return true if a logical slot is allocated from this slot, otherwise false
+	 */
+	public boolean isUsed() {
+		return payloadReference.get() != null;
+	}
+
+	/**
+	 * Tries to assign the given payload to this allocated slot. This only works if there has not
+	 * been another payload assigned to this slot.
+	 *
+	 * @param payload to assign to this slot
+	 * @return true if the payload could be assigned, otherwise false
+	 */
+	public boolean tryAssignPayload(Payload payload) {
+		return payloadReference.compareAndSet(null, payload);
+	}
+
+	/**
+	 * Triggers the release of the assigned payload. If the payload could be released,
+	 * then it is removed from the slot.
+	 *
+	 * @param cause of the release operation
+	 * @return true if the payload could be released and was removed from the slot, otherwise false
+	 */
+	public boolean releasePayload(Throwable cause) {
+		final Payload payload = payloadReference.get();
+
+		if (payload != null) {
+			if (payload.release(cause)) {
+				payloadReference.set(null);
+
+				return true;
+			} else {
+				return false;
+			}
+		} else {
+			return true;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This always returns a reference hash code.
+	 */
+	@Override
+	public final int hashCode() {
+		return super.hashCode();
+	}
+
+	/**
+	 * This always checks based on reference equality.
+	 */
+	@Override
+	public final boolean equals(Object obj) {
+		return this == obj;
+	}
+
+	@Override
+	public String toString() {
+		return "AllocatedSlot " + allocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber;
+	}
+
+	// -----------------------------------------------------------------------
+	// Interfaces
+	// -----------------------------------------------------------------------
+
+	/**
+	 * Payload which can be assigned to an {@link AllocatedSlot}.
+	 */
+	interface Payload {
+
+		/**
+		 * Releases the payload. If the payload could be released, then it returns true,
+		 * otherwise false.
+		 *
+		 * @param cause of the payload release
+		 * @return true if the payload could be released, otherwise false
+		 */
+		boolean release(Throwable cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java
new file mode 100644
index 0000000..045678e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface for components which have to perform actions on allocated slots.
+ */
+public interface AllocatedSlotActions {
+
+	/**
+	 * Releases the slot with the given {@link SlotRequestId}. If the slot belonged to a
+	 * slot sharing group, then the corresponding {@link SlotSharingGroupId} has to be
+	 * provided. Additionally, one can provide a cause for the slot release.
+	 *
+	 * @param slotRequestId identifying the slot to release
+	 * @param slotSharingGroupId identifying the slot sharing group to which the slot belongs, null if none
+	 * @param cause of the slot release, null if none
+	 * @return Acknowledge (future) after the slot has been released
+	 */
+	CompletableFuture<Acknowledge> releaseSlot(
+		SlotRequestId slotRequestId,
+		@Nullable SlotSharingGroupId slotSharingGroupId,
+		@Nullable Throwable cause);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
new file mode 100644
index 0000000..04b3ca6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+
+/**
+ * Map which stores values under two different indices.
+ *
+ * @param <A> Type of key A
+ * @param <B> Type of key B
+ * @param <V> Type of the value
+ */
+public class DualKeyMap<A, B, V> {
+
+	private final HashMap<A, Tuple2<B, V>> aMap;
+
+	private final HashMap<B, A> bMap;
+
+	private transient Collection<V> values;
+
+	public DualKeyMap(int initialCapacity) {
+		this.aMap = new HashMap<>(initialCapacity);
+		this.bMap = new HashMap<>(initialCapacity);
+	}
+
+	public int size() {
+		return aMap.size();
+	}
+
+	public V getKeyA(A aKey) {
+		final Tuple2<B, V> value = aMap.get(aKey);
+
+		if (value != null) {
+			return value.f1;
+		} else {
+			return null;
+		}
+	}
+
+	public V getKeyB(B bKey) {
+		final A aKey = bMap.get(bKey);
+
+		if (aKey != null) {
+			return aMap.get(aKey).f1;
+		} else {
+			return null;
+		}
+	}
+
+	public V put(A aKey, B bKey, V value) {
+		Tuple2<B, V> aValue = aMap.put(aKey, Tuple2.of(bKey, value));
+		bMap.put(bKey, aKey);
+
+		if (aValue != null) {
+			return aValue.f1;
+		} else {
+			return null;
+		}
+	}
+
+	public boolean containsKeyA(A aKey) {
+		return aMap.containsKey(aKey);
+	}
+
+	public boolean containsKeyB(B bKey) {
+		return bMap.containsKey(bKey);
+	}
+
+	public V removeKeyA(A aKey) {
+		Tuple2<B, V> aValue = aMap.remove(aKey);
+
+		if (aValue != null) {
+			bMap.remove(aValue.f0);
+			return aValue.f1;
+		} else {
+			return null;
+		}
+	}
+
+	public V removeKeyB(B bKey) {
+		A aKey = bMap.remove(bKey);
+
+		if (aKey != null) {
+			Tuple2<B, V> aValue = aMap.remove(aKey);
+			if (aValue != null) {
+				return aValue.f1;
+			} else {
+				return null;
+			}
+		} else {
+			return null;
+		}
+	}
+
+	public Collection<V> values() {
+		Collection<V> vs = values;
+
+		if (vs == null) {
+			vs = new Values();
+			values = vs;
+		}
+
+		return vs;
+	}
+
+	public void clear() {
+		aMap.clear();
+		bMap.clear();
+	}
+
+	// -----------------------------------------------------------------------
+	// Inner classes
+	// -----------------------------------------------------------------------
+
+	/**
+	 * Collection which contains the values of the dual key map.
+	 */
+	private final class Values extends AbstractCollection<V> {
+
+		@Override
+		public Iterator<V> iterator() {
+			return new ValueIterator();
+		}
+
+		@Override
+		public int size() {
+			return aMap.size();
+		}
+	}
+
+	/**
+	 * Iterator which iterates over the values of the dual key map.
+	 */
+	private final class ValueIterator implements Iterator<V> {
+
+		private final Iterator<Tuple2<B, V>> iterator = aMap.values().iterator();
+
+		@Override
+		public boolean hasNext() {
+			return iterator.hasNext();
+		}
+
+		@Override
+		public V next() {
+			Tuple2<B, V> value = iterator.next();
+
+			return value.f1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
new file mode 100644
index 0000000..9bd559b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Function;
+
+/**
+ * Implementation of the {@link LogicalSlot} which is used by the {@link SlotPool}.
+ */
+public class SingleLogicalSlot implements LogicalSlot, AllocatedSlot.Payload {
+
+	private static final AtomicReferenceFieldUpdater<SingleLogicalSlot, Payload> PAYLOAD_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+		SingleLogicalSlot.class,
+		Payload.class,
+		"payload");
+
+	private final SlotRequestId slotRequestId;
+
+	private final SlotContext slotContext;
+
+	// null if the logical slot does not belong to a slot sharing group, otherwise non-null
+	@Nullable
+	private final SlotSharingGroupId slotSharingGroupId;
+
+	// locality of this slot wrt the requested preferred locations
+	private final Locality locality;
+
+	// owner of this slot to which it is returned upon release
+	private final SlotOwner slotOwner;
+
+	// LogicalSlot.Payload of this slot
+	private volatile Payload payload;
+
+	public SingleLogicalSlot(
+			SlotRequestId slotRequestId,
+			SlotContext slotContext,
+			@Nullable SlotSharingGroupId slotSharingGroupId,
+			Locality locality,
+			SlotOwner slotOwner) {
+		this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
+		this.slotContext = Preconditions.checkNotNull(slotContext);
+		this.slotSharingGroupId = slotSharingGroupId;
+		this.locality = Preconditions.checkNotNull(locality);
+		this.slotOwner = Preconditions.checkNotNull(slotOwner);
+
+		payload = null;
+	}
+
+	@Override
+	public TaskManagerLocation getTaskManagerLocation() {
+		return slotContext.getTaskManagerLocation();
+	}
+
+	@Override
+	public TaskManagerGateway getTaskManagerGateway() {
+		return slotContext.getTaskManagerGateway();
+	}
+
+	@Override
+	public Locality getLocality() {
+		return locality;
+	}
+
+	@Override
+	public boolean isAlive() {
+		final Payload currentPayload = payload;
+
+		if (currentPayload != null) {
+			return !currentPayload.getTerminalStateFuture().isDone();
+		} else {
+			// We are always alive if there is no payload assigned yet.
+			// If this slot is released and no payload is assigned, then the TERMINATED_PAYLOAD is assigned
+			return true;
+		}
+	}
+
+	@Override
+	public boolean tryAssignPayload(Payload payload) {
+		Preconditions.checkNotNull(payload);
+		return PAYLOAD_UPDATER.compareAndSet(this, null, payload);
+	}
+
+	@Nullable
+	@Override
+	public Payload getPayload() {
+		return payload;
+	}
+
+	@Override
+	public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
+		// set an already terminated payload if the payload of this slot is still empty
+		tryAssignPayload(TERMINATED_PAYLOAD);
+
+		// notify the payload that the slot will be released
+		payload.fail(cause);
+
+		// Wait until the payload has been terminated. Only then, we return the slot to its rightful owner
+		return payload.getTerminalStateFuture()
+			.handle((Object ignored, Throwable throwable) -> slotOwner.returnAllocatedSlot(this))
+			.thenApply(Function.identity());
+	}
+
+	@Override
+	public int getPhysicalSlotNumber() {
+		return slotContext.getPhysicalSlotNumber();
+	}
+
+	@Override
+	public AllocationID getAllocationId() {
+		return slotContext.getAllocationId();
+	}
+
+	@Override
+	public SlotRequestId getSlotRequestId() {
+		return slotRequestId;
+	}
+
+	@Nullable
+	@Override
+	public SlotSharingGroupId getSlotSharingGroupId() {
+		return slotSharingGroupId;
+	}
+
+	// -------------------------------------------------------------------------
+	// AllocatedSlot.Payload implementation
+	// -------------------------------------------------------------------------
+
+	/**
+	 * A release of the payload by the {@link AllocatedSlot} triggers a release of the payload of
+	 * the logical slot.
+	 *
+	 * @param cause of the payload release
+	 * @return true if the logical slot's payload could be released, otherwise false
+	 */
+	@Override
+	public boolean release(Throwable cause) {
+		return releaseSlot(cause).isDone();
+	}
+}