You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 16:42:26 UTC
[09/11] flink git commit: [FLINK-7956] [flip6] Add support for queued
scheduling with slot sharing to SlotPool
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java
new file mode 100644
index 0000000..4511bf6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A logical slot represents a resource on a TaskManager into
+ * which a single task can be deployed.
+ */
+public interface LogicalSlot {
+
+ Payload TERMINATED_PAYLOAD = new Payload() {
+
+ private final CompletableFuture<?> completedTerminationFuture = CompletableFuture.completedFuture(null);
+ @Override
+ public void fail(Throwable cause) {
+ // ignore
+ }
+
+ @Override
+ public CompletableFuture<?> getTerminalStateFuture() {
+ return completedTerminationFuture;
+ }
+ };
+
+ /**
+ * Return the TaskManager location of this slot.
+ *
+ * @return TaskManager location of this slot
+ */
+ TaskManagerLocation getTaskManagerLocation();
+
+ /**
+ * Return the TaskManager gateway to talk to the TaskManager.
+ *
+ * @return TaskManager gateway to talk to the TaskManager
+ */
+ TaskManagerGateway getTaskManagerGateway();
+
+ /**
+ * Gets the locality of this slot.
+ *
+ * @return locality of this slot
+ */
+ Locality getLocality();
+
+ /**
+ * True if the slot is alive and has not been released.
+ *
+ * @return True if the slot is alive, otherwise false if the slot is released
+ */
+ boolean isAlive();
+
+ /**
+ * Tries to assign a payload to this slot. One can only assign a single
+ * payload once.
+ *
+ * @param payload to be assigned to this slot.
+ * @return true if the payload could be assigned, otherwise false
+ */
+ boolean tryAssignPayload(Payload payload);
+
+ /**
+ * Returns the set payload or null if none.
+ *
+ * @return Payload of this slot of null if none
+ */
+ @Nullable
+ Payload getPayload();
+
+ /**
+ * Releases this slot.
+ *
+ * @return Future which is completed once the slot has been released,
+ * in case of a failure it is completed exceptionally
+ * @deprecated Added because extended the actual releaseSlot method with cause parameter.
+ */
+ default CompletableFuture<?> releaseSlot() {
+ return releaseSlot(null);
+ }
+
+ /**
+ * Releases this slot.
+ *
+ * @param cause why the slot was released or null if none
+ * @return future which is completed once the slot has been released
+ */
+ CompletableFuture<?> releaseSlot(@Nullable Throwable cause);
+
+ /**
+ * Gets the slot number on the TaskManager.
+ *
+ * @return slot number
+ */
+ int getPhysicalSlotNumber();
+
+ /**
+ * Gets the allocation id of this slot.
+ *
+ * @return allocation id of this slot
+ */
+ AllocationID getAllocationId();
+
+ /**
+ * Gets the slot request id uniquely identifying the request with which this
+ * slot has been allocated.
+ *
+ * @return Unique id identifying the slot request with which this slot was allocated
+ */
+ SlotRequestId getSlotRequestId();
+
+ /**
+ * Gets the slot sharing group id to which this slot belongs.
+ *
+ * @return slot sharing group id of this slot or null, if none.
+ */
+ @Nullable
+ SlotSharingGroupId getSlotSharingGroupId();
+
+ /**
+ * Payload for a logical slot.
+ */
+ interface Payload {
+
+ /**
+ * Fail the payload with the given cause.
+ *
+ * @param cause of the failure
+ */
+ void fail(Throwable cause);
+
+ /**
+ * Gets the terminal state future which is completed once the payload
+ * has reached a terminal state.
+ *
+ * @return Terminal state future
+ */
+ CompletableFuture<?> getTerminalStateFuture();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
new file mode 100644
index 0000000..65bf2a1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Interface for the context of a {@link LogicalSlot}. This context contains information
+ * about the underlying allocated slot and how to communicate with the TaskManager on which
+ * it was allocated.
+ */
+public interface SlotContext {
+ /**
+ * Gets the id under which the slot has been allocated on the TaskManager. This id uniquely identifies the
+ * physical slot.
+ *
+ * @return The id under whic teh slot has been allocated on the TaskManager
+ */
+ AllocationID getAllocationId();
+
+ /**
+ * Gets the location info of the TaskManager that offers this slot.
+ *
+ * @return The location info of the TaskManager that offers this slot
+ */
+ TaskManagerLocation getTaskManagerLocation();
+
+ /**
+ * Gets the number of the slot.
+ *
+ * @return The number of the slot on the TaskManager.
+ */
+ int getPhysicalSlotNumber();
+
+ /**
+ * Gets the actor gateway that can be used to send messages to the TaskManager.
+ * <p>
+ * This method should be removed once the new interface-based RPC abstraction is in place
+ *
+ * @return The gateway that can be used to send messages to the TaskManager.
+ */
+ TaskManagerGateway getTaskManagerGateway();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotOwner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotOwner.java
new file mode 100644
index 0000000..9cc6f81
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotOwner.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface for components that hold slots and to which slots get released / recycled.
+ */
+public interface SlotOwner {
+
+ /**
+ * Return the given slot to the slot owner.
+ *
+ * @param logicalSlot to return
+ * @return Future which is completed with true if the slot could be returned, otherwise with false
+ */
+ CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
new file mode 100644
index 0000000..d3fa775
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.util.AbstractID;
+
+/**
+ * Request id identifying slot requests made by the {@link SlotProvider} towards the
+ * {@link SlotPool}.
+ */
+public final class SlotRequestId extends AbstractID {
+ private static final long serialVersionUID = -6072105912250154283L;
+
+ public SlotRequestId(long lowerPart, long upperPart) {
+ super(lowerPart, upperPart);
+ }
+
+ public SlotRequestId() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
new file mode 100644
index 0000000..a560ebc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@code AllocatedSlot} represents a slot that the JobMaster allocated from a TaskExecutor.
+ * It represents a slice of allocated resources from the TaskExecutor.
+ *
+ * <p>To allocate an {@code AllocatedSlot}, the requests a slot from the ResourceManager. The
+ * ResourceManager picks (or starts) a TaskExecutor that will then allocate the slot to the
+ * JobMaster and notify the JobMaster.
+ *
+ * <p>Note: Prior to the resource management changes introduced in (Flink Improvement Proposal 6),
+ * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the
+ * JobManager. All slots had a default unknown resource profile.
+ */
+public class AllocatedSlot implements SlotContext {
+
+ /** The ID under which the slot is allocated. Uniquely identifies the slot. */
+ private final AllocationID allocationId;
+
+ /** The location information of the TaskManager to which this slot belongs */
+ private final TaskManagerLocation taskManagerLocation;
+
+ /** The resource profile of the slot provides */
+ private final ResourceProfile resourceProfile;
+
+ /** RPC gateway to call the TaskManager that holds this slot */
+ private final TaskManagerGateway taskManagerGateway;
+
+ /** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
+ private final int physicalSlotNumber;
+
+ private final AtomicReference<Payload> payloadReference;
+
+ // ------------------------------------------------------------------------
+
+ public AllocatedSlot(
+ AllocationID allocationId,
+ TaskManagerLocation location,
+ int physicalSlotNumber,
+ ResourceProfile resourceProfile,
+ TaskManagerGateway taskManagerGateway) {
+ this.allocationId = checkNotNull(allocationId);
+ this.taskManagerLocation = checkNotNull(location);
+ this.physicalSlotNumber = physicalSlotNumber;
+ this.resourceProfile = checkNotNull(resourceProfile);
+ this.taskManagerGateway = checkNotNull(taskManagerGateway);
+
+ payloadReference = new AtomicReference<>(null);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the ID under which the slot is allocated, which uniquely identifies the slot.
+ *
+ * @return The ID under which the slot is allocated
+ */
+ public AllocationID getAllocationId() {
+ return allocationId;
+ }
+
+ /**
+ * Gets the ID of the TaskManager on which this slot was allocated.
+ *
+ * <p>This is equivalent to {@link #getTaskManagerLocation()#getTaskManagerId()}.
+ *
+ * @return This slot's TaskManager's ID.
+ */
+ public ResourceID getTaskManagerId() {
+ return getTaskManagerLocation().getResourceID();
+ }
+
+ /**
+ * Gets the resource profile of the slot.
+ *
+ * @return The resource profile of the slot.
+ */
+ public ResourceProfile getResourceProfile() {
+ return resourceProfile;
+ }
+
+ /**
+ * Gets the location info of the TaskManager that offers this slot.
+ *
+ * @return The location info of the TaskManager that offers this slot
+ */
+ public TaskManagerLocation getTaskManagerLocation() {
+ return taskManagerLocation;
+ }
+
+ /**
+ * Gets the actor gateway that can be used to send messages to the TaskManager.
+ * <p>
+ * This method should be removed once the new interface-based RPC abstraction is in place
+ *
+ * @return The actor gateway that can be used to send messages to the TaskManager.
+ */
+ public TaskManagerGateway getTaskManagerGateway() {
+ return taskManagerGateway;
+ }
+
+ /**
+ * Returns the physical slot number of the allocated slot. The physical slot number corresponds
+ * to the slot index on the TaskExecutor.
+ *
+ * @return Physical slot number of the allocated slot
+ */
+ public int getPhysicalSlotNumber() {
+ return physicalSlotNumber;
+ }
+
+ /**
+ * Returns true if this slot is not being used (e.g. a logical slot is allocated from this slot).
+ *
+ * @return true if a logical slot is allocated from this slot, otherwise false
+ */
+ public boolean isUsed() {
+ return payloadReference.get() != null;
+ }
+
+ /**
+ * Tries to assign the given payload to this allocated slot. This only works if there has not
+ * been another payload assigned to this slot.
+ *
+ * @param payload to assign to this slot
+ * @return true if the payload could be assigned, otherwise false
+ */
+ public boolean tryAssignPayload(Payload payload) {
+ return payloadReference.compareAndSet(null, payload);
+ }
+
+ /**
+ * Triggers the release of the assigned payload. If the payload could be released,
+ * then it is removed from the slot.
+ *
+ * @param cause of the release operation
+ * @return true if the payload could be released and was removed from the slot, otherwise false
+ */
+ public boolean releasePayload(Throwable cause) {
+ final Payload payload = payloadReference.get();
+
+ if (payload != null) {
+ if (payload.release(cause)) {
+ payloadReference.set(null);
+
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return true;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * This always returns a reference hash code.
+ */
+ @Override
+ public final int hashCode() {
+ return super.hashCode();
+ }
+
+ /**
+ * This always checks based on reference equality.
+ */
+ @Override
+ public final boolean equals(Object obj) {
+ return this == obj;
+ }
+
+ @Override
+ public String toString() {
+ return "AllocatedSlot " + allocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber;
+ }
+
+ // -----------------------------------------------------------------------
+ // Interfaces
+ // -----------------------------------------------------------------------
+
+ /**
+ * Payload which can be assigned to an {@link AllocatedSlot}.
+ */
+ interface Payload {
+
+ /**
+ * Releases the payload. If the payload could be released, then it returns true,
+ * otherwise false.
+ *
+ * @param cause of the payload release
+ * @return true if the payload could be released, otherwise false
+ */
+ boolean release(Throwable cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java
new file mode 100644
index 0000000..045678e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface for components which have to perform actions on allocated slots.
+ */
+public interface AllocatedSlotActions {
+
+ /**
+ * Releases the slot with the given {@link SlotRequestId}. If the slot belonged to a
+ * slot sharing group, then the corresponding {@link SlotSharingGroupId} has to be
+ * provided. Additionally, one can provide a cause for the slot release.
+ *
+ * @param slotRequestId identifying the slot to release
+ * @param slotSharingGroupId identifying the slot sharing group to which the slot belongs, null if none
+ * @param cause of the slot release, null if none
+ * @return Acknowledge (future) after the slot has been released
+ */
+ CompletableFuture<Acknowledge> releaseSlot(
+ SlotRequestId slotRequestId,
+ @Nullable SlotSharingGroupId slotSharingGroupId,
+ @Nullable Throwable cause);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
new file mode 100644
index 0000000..04b3ca6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+
+/**
+ * Map which stores values under two different indices.
+ *
+ * @param <A> Type of key A
+ * @param <B> Type of key B
+ * @param <V> Type of the value
+ */
+public class DualKeyMap<A, B, V> {
+
+ private final HashMap<A, Tuple2<B, V>> aMap;
+
+ private final HashMap<B, A> bMap;
+
+ private transient Collection<V> values;
+
+ public DualKeyMap(int initialCapacity) {
+ this.aMap = new HashMap<>(initialCapacity);
+ this.bMap = new HashMap<>(initialCapacity);
+ }
+
+ public int size() {
+ return aMap.size();
+ }
+
+ public V getKeyA(A aKey) {
+ final Tuple2<B, V> value = aMap.get(aKey);
+
+ if (value != null) {
+ return value.f1;
+ } else {
+ return null;
+ }
+ }
+
+ public V getKeyB(B bKey) {
+ final A aKey = bMap.get(bKey);
+
+ if (aKey != null) {
+ return aMap.get(aKey).f1;
+ } else {
+ return null;
+ }
+ }
+
+ public V put(A aKey, B bKey, V value) {
+ Tuple2<B, V> aValue = aMap.put(aKey, Tuple2.of(bKey, value));
+ bMap.put(bKey, aKey);
+
+ if (aValue != null) {
+ return aValue.f1;
+ } else {
+ return null;
+ }
+ }
+
+ public boolean containsKeyA(A aKey) {
+ return aMap.containsKey(aKey);
+ }
+
+ public boolean containsKeyB(B bKey) {
+ return bMap.containsKey(bKey);
+ }
+
+ public V removeKeyA(A aKey) {
+ Tuple2<B, V> aValue = aMap.remove(aKey);
+
+ if (aValue != null) {
+ bMap.remove(aValue.f0);
+ return aValue.f1;
+ } else {
+ return null;
+ }
+ }
+
+ public V removeKeyB(B bKey) {
+ A aKey = bMap.remove(bKey);
+
+ if (aKey != null) {
+ Tuple2<B, V> aValue = aMap.remove(aKey);
+ if (aValue != null) {
+ return aValue.f1;
+ } else {
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+
+ public Collection<V> values() {
+ Collection<V> vs = values;
+
+ if (vs == null) {
+ vs = new Values();
+ values = vs;
+ }
+
+ return vs;
+ }
+
+ public void clear() {
+ aMap.clear();
+ bMap.clear();
+ }
+
+ // -----------------------------------------------------------------------
+ // Inner classes
+ // -----------------------------------------------------------------------
+
+ /**
+ * Collection which contains the values of the dual key map.
+ */
+ private final class Values extends AbstractCollection<V> {
+
+ @Override
+ public Iterator<V> iterator() {
+ return new ValueIterator();
+ }
+
+ @Override
+ public int size() {
+ return aMap.size();
+ }
+ }
+
+ /**
+ * Iterator which iterates over the values of the dual key map.
+ */
+ private final class ValueIterator implements Iterator<V> {
+
+ private final Iterator<Tuple2<B, V>> iterator = aMap.values().iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public V next() {
+ Tuple2<B, V> value = iterator.next();
+
+ return value.f1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
new file mode 100644
index 0000000..9bd559b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Function;
+
+/**
+ * Implementation of the {@link LogicalSlot} which is used by the {@link SlotPool}.
+ */
+public class SingleLogicalSlot implements LogicalSlot, AllocatedSlot.Payload {
+
+ private static final AtomicReferenceFieldUpdater<SingleLogicalSlot, Payload> PAYLOAD_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+ SingleLogicalSlot.class,
+ Payload.class,
+ "payload");
+
+ private final SlotRequestId slotRequestId;
+
+ private final SlotContext slotContext;
+
+ // null if the logical slot does not belong to a slot sharing group, otherwise non-null
+ @Nullable
+ private final SlotSharingGroupId slotSharingGroupId;
+
+ // locality of this slot wrt the requested preferred locations
+ private final Locality locality;
+
+ // owner of this slot to which it is returned upon release
+ private final SlotOwner slotOwner;
+
+ // LogicalSlot.Payload of this slot
+ private volatile Payload payload;
+
+ public SingleLogicalSlot(
+ SlotRequestId slotRequestId,
+ SlotContext slotContext,
+ @Nullable SlotSharingGroupId slotSharingGroupId,
+ Locality locality,
+ SlotOwner slotOwner) {
+ this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
+ this.slotContext = Preconditions.checkNotNull(slotContext);
+ this.slotSharingGroupId = slotSharingGroupId;
+ this.locality = Preconditions.checkNotNull(locality);
+ this.slotOwner = Preconditions.checkNotNull(slotOwner);
+
+ payload = null;
+ }
+
+ @Override
+ public TaskManagerLocation getTaskManagerLocation() {
+ return slotContext.getTaskManagerLocation();
+ }
+
+ @Override
+ public TaskManagerGateway getTaskManagerGateway() {
+ return slotContext.getTaskManagerGateway();
+ }
+
+ @Override
+ public Locality getLocality() {
+ return locality;
+ }
+
+ @Override
+ public boolean isAlive() {
+ final Payload currentPayload = payload;
+
+ if (currentPayload != null) {
+ return !currentPayload.getTerminalStateFuture().isDone();
+ } else {
+ // We are always alive if there is no payload assigned yet.
+ // If this slot is released and no payload is assigned, then the TERMINATED_PAYLOAD is assigned
+ return true;
+ }
+ }
+
+ @Override
+ public boolean tryAssignPayload(Payload payload) {
+ Preconditions.checkNotNull(payload);
+ return PAYLOAD_UPDATER.compareAndSet(this, null, payload);
+ }
+
+ @Nullable
+ @Override
+ public Payload getPayload() {
+ return payload;
+ }
+
+ @Override
+ public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
+ // set an already terminated payload if the payload of this slot is still empty
+ tryAssignPayload(TERMINATED_PAYLOAD);
+
+ // notify the payload that the slot will be released
+ payload.fail(cause);
+
+ // Wait until the payload has been terminated. Only then, we return the slot to its rightful owner
+ return payload.getTerminalStateFuture()
+ .handle((Object ignored, Throwable throwable) -> slotOwner.returnAllocatedSlot(this))
+ .thenApply(Function.identity());
+ }
+
+ @Override
+ public int getPhysicalSlotNumber() {
+ return slotContext.getPhysicalSlotNumber();
+ }
+
+ @Override
+ public AllocationID getAllocationId() {
+ return slotContext.getAllocationId();
+ }
+
+ @Override
+ public SlotRequestId getSlotRequestId() {
+ return slotRequestId;
+ }
+
+ @Nullable
+ @Override
+ public SlotSharingGroupId getSlotSharingGroupId() {
+ return slotSharingGroupId;
+ }
+
+ // -------------------------------------------------------------------------
+ // AllocatedSlot.Payload implementation
+ // -------------------------------------------------------------------------
+
+ /**
+ * A release of the payload by the {@link AllocatedSlot} triggers a release of the payload of
+ * the logical slot.
+ *
+ * @param cause of the payload release
+ * @return true if the logical slot's payload could be released, otherwise false
+ */
+ @Override
+ public boolean release(Throwable cause) {
+ return releaseSlot(cause).isDone();
+ }
+}