You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 16:42:20 UTC

[03/11] flink git commit: [FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to SlotPool

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
new file mode 100644
index 0000000..d5460eb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.SchedulerType.SCHEDULER;
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.SchedulerType.SLOT_POOL;
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+
+/**
+ * Test base for scheduler related test cases. The test are
+ * executed with the {@link Scheduler} and the {@link SlotPool}.
+ */
+public class SchedulerTestBase extends TestLogger {
+
+	protected TestingSlotProvider testingSlotProvider;
+
+	private SchedulerType schedulerType;
+
+	private RpcService rpcService;
+
+	enum SchedulerType {
+		SCHEDULER,
+		SLOT_POOL
+	}
+
+	@Parameterized.Parameters(name = "Scheduler type = {0}")
+	public static Collection<Object[]> schedulerTypes() {
+		return Arrays.asList(
+			new Object[]{SCHEDULER},
+			new Object[]{SLOT_POOL});
+	}
+
+	protected SchedulerTestBase(SchedulerType schedulerType) {
+		this.schedulerType = Preconditions.checkNotNull(schedulerType);
+		rpcService = null;
+	}
+
+	@Before
+	public void setup() throws Exception {
+		switch (schedulerType) {
+			case SCHEDULER:
+				testingSlotProvider = new TestingSchedulerSlotProvider(
+					new Scheduler(
+						TestingUtils.defaultExecutionContext()));
+				break;
+			case SLOT_POOL:
+				rpcService = new TestingRpcService();
+				final JobID jobId = new JobID();
+				final TestingSlotPool slotPool = new TestingSlotPool(rpcService, jobId);
+				testingSlotProvider = new TestingSlotPoolSlotProvider(slotPool);
+
+				final JobMasterId jobMasterId = JobMasterId.generate();
+				final String jobManagerAddress = "localhost";
+				slotPool.start(jobMasterId, jobManagerAddress);
+				break;
+		}
+	}
+
+	@After
+	public void teardown() throws Exception {
+		if (testingSlotProvider != null) {
+			testingSlotProvider.shutdown();
+			testingSlotProvider = null;
+		}
+
+		if (rpcService != null) {
+			rpcService.stopService();
+			rpcService = null;
+		}
+	}
+
+	protected interface TestingSlotProvider extends SlotProvider {
+		TaskManagerLocation addTaskManager(int numberSlots);
+
+		void releaseTaskManager(ResourceID resourceId);
+
+		int getNumberOfAvailableSlots();
+
+		int getNumberOfLocalizedAssignments();
+
+		int getNumberOfNonLocalizedAssignments();
+
+		int getNumberOfUnconstrainedAssignments();
+
+		int getNumberOfHostLocalizedAssignments();
+
+		int getNumberOfSlots(SlotSharingGroup slotSharingGroup);
+
+		int getNumberOfAvailableSlotsForGroup(SlotSharingGroup slotSharingGroup, JobVertexID jobVertexId);
+
+		void shutdown() throws Exception;
+	}
+
+	private static final class TestingSchedulerSlotProvider implements TestingSlotProvider {
+		private final Scheduler scheduler;
+
+		private TestingSchedulerSlotProvider(Scheduler scheduler) {
+			this.scheduler = Preconditions.checkNotNull(scheduler);
+		}
+
+		@Override
+		public CompletableFuture<LogicalSlot> allocateSlot(ScheduledUnit task, boolean allowQueued, Collection<TaskManagerLocation> preferredLocations) {
+			return scheduler.allocateSlot(task, allowQueued, preferredLocations);
+		}
+
+		@Override
+		public TaskManagerLocation addTaskManager(int numberSlots) {
+			final Instance instance = getRandomInstance(numberSlots);
+			scheduler.newInstanceAvailable(instance);
+
+			return instance.getTaskManagerLocation();
+		}
+
+		@Override
+		public void releaseTaskManager(ResourceID resourceId) {
+			final Instance instance = scheduler.getInstance(resourceId);
+
+			if (instance != null) {
+				scheduler.instanceDied(instance);
+			}
+		}
+
+		@Override
+		public int getNumberOfAvailableSlots() {
+			return scheduler.getNumberOfAvailableSlots();
+		}
+
+		@Override
+		public int getNumberOfLocalizedAssignments() {
+			return scheduler.getNumberOfLocalizedAssignments();
+		}
+
+		@Override
+		public int getNumberOfNonLocalizedAssignments() {
+			return scheduler.getNumberOfNonLocalizedAssignments();
+		}
+
+		@Override
+		public int getNumberOfUnconstrainedAssignments() {
+			return scheduler.getNumberOfUnconstrainedAssignments();
+		}
+
+		@Override
+		public int getNumberOfHostLocalizedAssignments() {
+			return 0;
+		}
+
+		@Override
+		public int getNumberOfSlots(SlotSharingGroup slotSharingGroup) {
+			return slotSharingGroup.getTaskAssignment().getNumberOfSlots();
+		}
+
+		@Override
+		public int getNumberOfAvailableSlotsForGroup(SlotSharingGroup slotSharingGroup, JobVertexID jobVertexId) {
+			return slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexId);
+		}
+
+		@Override
+		public void shutdown() {
+			scheduler.shutdown();
+		}
+	}
+
+	private static final class TestingSlotPoolSlotProvider implements TestingSlotProvider {
+
+		private final TestingSlotPool slotPool;
+
+		private final SlotProvider slotProvider;
+
+		private final AtomicInteger numberOfLocalizedAssignments;
+
+		private final AtomicInteger numberOfNonLocalizedAssignments;
+
+		private final AtomicInteger numberOfUnconstrainedAssignments;
+
+		private final AtomicInteger numberOfHostLocalizedAssignments;
+
+		private TestingSlotPoolSlotProvider(TestingSlotPool slotPool) {
+			this.slotPool = Preconditions.checkNotNull(slotPool);
+			this.slotProvider = slotPool.getSlotProvider();
+
+			this.numberOfLocalizedAssignments = new AtomicInteger();
+			this.numberOfNonLocalizedAssignments = new AtomicInteger();
+			this.numberOfUnconstrainedAssignments = new AtomicInteger();
+			this.numberOfHostLocalizedAssignments = new AtomicInteger();
+		}
+
+		@Override
+		public TaskManagerLocation addTaskManager(int numberSlots) {
+			final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+			final ResourceID resourceId = taskManagerLocation.getResourceID();
+			final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
+
+			try {
+				slotPoolGateway.registerTaskManager(resourceId).get();
+			} catch (Exception e) {
+				throw new RuntimeException("Unexpected exception occurred. This indicates a programming bug.", e);
+			}
+
+			final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+			final Collection<SlotOffer> slotOffers = new ArrayList<>(numberSlots);
+
+			for (int i = 0; i < numberSlots; i++) {
+				final SlotOffer slotOffer = new SlotOffer(
+					new AllocationID(),
+					i,
+					ResourceProfile.UNKNOWN);
+
+				slotOffers.add(slotOffer);
+			}
+
+			final Collection<SlotOffer> acceptedSlotOffers;
+
+			try {
+				acceptedSlotOffers = slotPoolGateway.offerSlots(
+					taskManagerLocation,
+					taskManagerGateway,
+					slotOffers).get();
+			} catch (Exception e) {
+				throw new RuntimeException("Unexpected exception occurred. This indicates a programming bug.", e);
+			}
+
+			Preconditions.checkState(acceptedSlotOffers.size() == numberSlots);
+
+			return taskManagerLocation;
+		}
+
+		@Override
+		public void releaseTaskManager(ResourceID resourceId) {
+			try {
+				slotPool.releaseTaskManager(resourceId).get();
+			} catch (Exception e) {
+				throw new RuntimeException("Should not have happened.", e);
+			}
+		}
+
+		@Override
+		public int getNumberOfAvailableSlots() {
+			try {
+				return slotPool.getNumberOfAvailableSlots().get();
+			} catch (Exception e) {
+				throw new RuntimeException("Should not have happened.", e);
+			}
+		}
+
+		@Override
+		public int getNumberOfLocalizedAssignments() {
+			return numberOfLocalizedAssignments.get();
+		}
+
+		@Override
+		public int getNumberOfNonLocalizedAssignments() {
+			return numberOfNonLocalizedAssignments.get();
+		}
+
+		@Override
+		public int getNumberOfUnconstrainedAssignments() {
+			return numberOfUnconstrainedAssignments.get();
+		}
+
+		@Override
+		public int getNumberOfHostLocalizedAssignments() {
+			return numberOfHostLocalizedAssignments.get();
+		}
+
+		@Override
+		public int getNumberOfSlots(SlotSharingGroup slotSharingGroup) {
+			try {
+				return slotPool.getNumberOfSharedSlots(slotSharingGroup.getSlotSharingGroupId()).get();
+			} catch (Exception e) {
+				throw new RuntimeException("Should not have happened.", e);
+			}
+		}
+
+		@Override
+		public int getNumberOfAvailableSlotsForGroup(SlotSharingGroup slotSharingGroup, JobVertexID jobVertexId) {
+			try {
+				return slotPool.getNumberOfAvailableSlotsForGroup(slotSharingGroup.getSlotSharingGroupId(), jobVertexId).get();
+			} catch (Exception e) {
+				throw new RuntimeException("Should not have happened.", e);
+			}
+		}
+
+		@Override
+		public void shutdown() throws Exception {
+			RpcUtils.terminateRpcEndpoint(slotPool, TestingUtils.TIMEOUT());
+		}
+
+		@Override
+		public CompletableFuture<LogicalSlot> allocateSlot(ScheduledUnit task, boolean allowQueued, Collection<TaskManagerLocation> preferredLocations) {
+			return slotProvider.allocateSlot(task, allowQueued, preferredLocations).thenApply(
+				(LogicalSlot logicalSlot) -> {
+					switch (logicalSlot.getLocality()) {
+						case LOCAL:
+							numberOfLocalizedAssignments.incrementAndGet();
+							break;
+						case UNCONSTRAINED:
+							numberOfUnconstrainedAssignments.incrementAndGet();
+							break;
+						case NON_LOCAL:
+							numberOfNonLocalizedAssignments.incrementAndGet();
+							break;
+						case HOST_LOCAL:
+							numberOfHostLocalizedAssignments.incrementAndGet();
+							break;
+						default:
+							// ignore
+					}
+
+					return logicalSlot;
+				});
+		}
+	}
+
+	private static final class TestingSlotPool extends SlotPool {
+
+		public TestingSlotPool(RpcService rpcService, JobID jobId) {
+			super(rpcService, jobId);
+		}
+
+		CompletableFuture<Integer> getNumberOfAvailableSlots() {
+			return callAsync(
+				() -> getAvailableSlots().size(),
+				TestingUtils.infiniteTime());
+		}
+
+		CompletableFuture<Integer> getNumberOfSharedSlots(SlotSharingGroupId slotSharingGroupId) {
+			return callAsync(
+				() -> {
+					final SlotSharingManager multiTaskSlotManager = slotSharingManagers.get(slotSharingGroupId);
+
+					if (multiTaskSlotManager != null) {
+						return multiTaskSlotManager.getResolvedRootSlots().size();
+					} else {
+						throw new FlinkException("No MultiTaskSlotManager registered under " + slotSharingGroupId + '.');
+					}
+				},
+				TestingUtils.infiniteTime());
+		}
+
+		CompletableFuture<Integer> getNumberOfAvailableSlotsForGroup(SlotSharingGroupId slotSharingGroupId, JobVertexID jobVertexId) {
+			return callAsync(
+				() -> {
+					final SlotSharingManager multiTaskSlotManager = slotSharingManagers.get(slotSharingGroupId);
+
+					if (multiTaskSlotManager != null) {
+						int availableSlots = 0;
+
+						for (SlotSharingManager.MultiTaskSlot multiTaskSlot : multiTaskSlotManager.getResolvedRootSlots()) {
+							if (!multiTaskSlot.contains(jobVertexId)) {
+								availableSlots++;
+							}
+						}
+
+						return availableSlots;
+					} else {
+						throw new FlinkException("No MultiTaskSlotmanager registered under " + slotSharingGroupId + '.');
+					}
+				},
+				TestingUtils.infiniteTime());
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index 98dca03..4186255 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -18,9 +18,18 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -33,17 +42,9 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.DummyActorGateway;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 public class SchedulerTestUtils {
@@ -83,9 +84,13 @@ public class SchedulerTestUtils {
 	
 	
 	public static Execution getDummyTask() {
+		ExecutionJobVertex executionJobVertex = mock(ExecutionJobVertex.class);
+
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
+		when(vertex.getJobVertex()).thenReturn(executionJobVertex);
+		when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
 		
 		Execution execution = mock(Execution.class);
 		when(execution.getVertex()).thenReturn(vertex);
@@ -117,11 +122,14 @@ public class SchedulerTestUtils {
 	}
 
 	public static Execution getTestVertex(Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures) {
+		ExecutionJobVertex executionJobVertex = mock(ExecutionJobVertex.class);
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 
 		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocationFutures);
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
+		when(vertex.getJobVertex()).thenReturn(executionJobVertex);
+		when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
 
 		Execution execution = mock(Execution.class);
 		when(execution.getVertex()).thenReturn(vertex);
@@ -130,9 +138,11 @@ public class SchedulerTestUtils {
 		return execution;
 	}
 	
-	public static Execution getTestVertex(JobVertexID jid, int taskIndex, int numTasks) {
+	public static Execution getTestVertex(JobVertexID jid, int taskIndex, int numTasks, SlotSharingGroup slotSharingGroup) {
+		ExecutionJobVertex executionJobVertex = mock(ExecutionJobVertex.class);
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
-		
+
+		when(executionJobVertex.getSlotSharingGroup()).thenReturn(slotSharingGroup);
 		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Collections.emptyList());
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.getJobvertexId()).thenReturn(jid);
@@ -141,6 +151,7 @@ public class SchedulerTestUtils {
 		when(vertex.getMaxParallelism()).thenReturn(numTasks);
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
 		when(vertex.getTaskNameWithSubtaskIndex()).thenReturn("TEST-VERTEX");
+		when(vertex.getJobVertex()).thenReturn(executionJobVertex);
 		
 		Execution execution = mock(Execution.class);
 		when(execution.getVertex()).thenReturn(vertex);
@@ -149,17 +160,26 @@ public class SchedulerTestUtils {
 	}
 
 	public static Execution getTestVertexWithLocation(
-			JobVertexID jid, int taskIndex, int numTasks, TaskManagerLocation... locations) {
+			JobVertexID jid,
+			int taskIndex,
+			int numTasks,
+			SlotSharingGroup slotSharingGroup,
+			TaskManagerLocation... locations) {
+
+		ExecutionJobVertex executionJobVertex = mock(ExecutionJobVertex.class);
+
+		when(executionJobVertex.getSlotSharingGroup()).thenReturn(slotSharingGroup);
 
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 
-		Collection<CompletableFuture<TaskManagerLocation>> preferrecLocationFutures = new ArrayList<>(locations.length);
+		Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = new ArrayList<>(locations.length);
 
 		for (TaskManagerLocation location : locations) {
-			preferrecLocationFutures.add(CompletableFuture.completedFuture(location));
+			preferredLocationFutures.add(CompletableFuture.completedFuture(location));
 		}
 
-		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferrecLocationFutures);
+		when(vertex.getJobVertex()).thenReturn(executionJobVertex);
+		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocationFutures);
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.getJobvertexId()).thenReturn(jid);
 		when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
index 6d17ad0..add1ec2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.runtime.jobmanager.slots;
 
-import org.apache.flink.runtime.instance.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
 
 import java.util.concurrent.CompletableFuture;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
index e7f9485..727c0b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.runtime.jobmanager.slots;
 
-import org.apache.flink.runtime.instance.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
new file mode 100644
index 0000000..e20700e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Simple logical slot for testing purposes.
+ */
+public class TestingLogicalSlot implements LogicalSlot {
+
+	private final TaskManagerLocation taskManagerLocation;
+
+	private final TaskManagerGateway taskManagerGateway;
+
+	private final AtomicReference<Payload> payloadReference;
+
+	private final int slotNumber;
+
+	private final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
+	
+	private final AllocationID allocationId;
+
+	private final SlotRequestId slotRequestId;
+
+	private final SlotSharingGroupId slotSharingGroupId;
+
+	public TestingLogicalSlot() {
+		this(
+			new LocalTaskManagerLocation(),
+			new SimpleAckingTaskManagerGateway(),
+			0,
+			new AllocationID(),
+			new SlotRequestId(),
+			new SlotSharingGroupId());
+	}
+
+	public TestingLogicalSlot(
+			TaskManagerLocation taskManagerLocation,
+			TaskManagerGateway taskManagerGateway,
+			int slotNumber,
+			AllocationID allocationId,
+			SlotRequestId slotRequestId,
+			SlotSharingGroupId slotSharingGroupId) {
+		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
+		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
+		this.payloadReference = new AtomicReference<>();
+		this.slotNumber = slotNumber;
+		this.allocationId = Preconditions.checkNotNull(allocationId);
+		this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
+		this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId);
+	}
+
+	@Override
+	public TaskManagerLocation getTaskManagerLocation() {
+		return taskManagerLocation;
+	}
+
+	@Override
+	public TaskManagerGateway getTaskManagerGateway() {
+		return taskManagerGateway;
+	}
+
+	@Override
+	public Locality getLocality() {
+		return Locality.UNKNOWN;
+	}
+
+	@Override
+	public boolean isAlive() {
+		return !releaseFuture.isDone();
+	}
+
+	@Override
+	public boolean tryAssignPayload(Payload payload) {
+		return payloadReference.compareAndSet(null, payload);
+	}
+
+	@Nullable
+	@Override
+	public Payload getPayload() {
+		return payloadReference.get();
+	}
+
+	@Override
+	public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
+		releaseFuture.complete(null);
+
+		return releaseFuture;
+	}
+
+	@Override
+	public int getPhysicalSlotNumber() {
+		return slotNumber;
+	}
+
+	@Override
+	public AllocationID getAllocationId() {
+		return allocationId;
+	}
+
+	@Override
+	public SlotRequestId getSlotRequestId() {
+		return slotRequestId;
+	}
+
+	@Nullable
+	@Override
+	public SlotSharingGroupId getSlotSharingGroupId() {
+		return slotSharingGroupId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingPayload.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingPayload.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingPayload.java
new file mode 100644
index 0000000..a59f765
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingPayload.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Simple payload implementation for testing purposes.
+ */
+public class TestingPayload implements LogicalSlot.Payload {
+
+	private final CompletableFuture<?> terminationFuture;
+
+	public TestingPayload() {
+		this.terminationFuture = new CompletableFuture<>();
+	}
+
+
+	@Override
+	public void fail(Throwable cause) {
+		terminationFuture.complete(null);
+	}
+
+	@Override
+	public CompletableFuture<?> getTerminalStateFuture() {
+		return terminationFuture;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotsTest.java
new file mode 100644
index 0000000..4dee924
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotsTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class AllocatedSlotsTest extends TestLogger {
+
+	@Test
+	public void testOperations() throws Exception {
+		SlotPool.AllocatedSlots allocatedSlots = new SlotPool.AllocatedSlots();
+
+		final AllocationID allocation1 = new AllocationID();
+		final SlotRequestId slotRequestID = new SlotRequestId();
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+		final ResourceID resource1 = taskManagerLocation.getResourceID();
+		final AllocatedSlot slot1 = createSlot(allocation1, taskManagerLocation);
+
+		allocatedSlots.add(slotRequestID, slot1);
+
+		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+		assertTrue(allocatedSlots.containResource(resource1));
+
+		assertEquals(slot1, allocatedSlots.get(allocation1));
+		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource1).size());
+		assertEquals(1, allocatedSlots.size());
+
+		final AllocationID allocation2 = new AllocationID();
+		final SlotRequestId slotRequestID2 = new SlotRequestId();
+		final AllocatedSlot slot2 = createSlot(allocation2, taskManagerLocation);
+
+		allocatedSlots.add(slotRequestID2, slot2);
+
+		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+		assertTrue(allocatedSlots.contains(slot2.getAllocationId()));
+		assertTrue(allocatedSlots.containResource(resource1));
+
+		assertEquals(slot1, allocatedSlots.get(allocation1));
+		assertEquals(slot2, allocatedSlots.get(allocation2));
+		assertEquals(2, allocatedSlots.getSlotsForTaskManager(resource1).size());
+		assertEquals(2, allocatedSlots.size());
+
+		final AllocationID allocation3 = new AllocationID();
+		final SlotRequestId slotRequestID3 = new SlotRequestId();
+		final TaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
+		final ResourceID resource2 = taskManagerLocation2.getResourceID();
+		final AllocatedSlot slot3 = createSlot(allocation3, taskManagerLocation2);
+
+		allocatedSlots.add(slotRequestID3, slot3);
+
+		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+		assertTrue(allocatedSlots.contains(slot2.getAllocationId()));
+		assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
+		assertTrue(allocatedSlots.containResource(resource1));
+		assertTrue(allocatedSlots.containResource(resource2));
+
+		assertEquals(slot1, allocatedSlots.get(allocation1));
+		assertEquals(slot2, allocatedSlots.get(allocation2));
+		assertEquals(slot3, allocatedSlots.get(allocation3));
+		assertEquals(2, allocatedSlots.getSlotsForTaskManager(resource1).size());
+		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
+		assertEquals(3, allocatedSlots.size());
+
+		allocatedSlots.remove(slot2.getAllocationId());
+
+		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+		assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
+		assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
+		assertTrue(allocatedSlots.containResource(resource1));
+		assertTrue(allocatedSlots.containResource(resource2));
+
+		assertEquals(slot1, allocatedSlots.get(allocation1));
+		assertNull(allocatedSlots.get(allocation2));
+		assertEquals(slot3, allocatedSlots.get(allocation3));
+		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource1).size());
+		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
+		assertEquals(2, allocatedSlots.size());
+
+		allocatedSlots.remove(slot1.getAllocationId());
+
+		assertFalse(allocatedSlots.contains(slot1.getAllocationId()));
+		assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
+		assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
+		assertFalse(allocatedSlots.containResource(resource1));
+		assertTrue(allocatedSlots.containResource(resource2));
+
+		assertNull(allocatedSlots.get(allocation1));
+		assertNull(allocatedSlots.get(allocation2));
+		assertEquals(slot3, allocatedSlots.get(allocation3));
+		assertEquals(0, allocatedSlots.getSlotsForTaskManager(resource1).size());
+		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
+		assertEquals(1, allocatedSlots.size());
+
+		allocatedSlots.remove(slot3.getAllocationId());
+
+		assertFalse(allocatedSlots.contains(slot1.getAllocationId()));
+		assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
+		assertFalse(allocatedSlots.contains(slot3.getAllocationId()));
+		assertFalse(allocatedSlots.containResource(resource1));
+		assertFalse(allocatedSlots.containResource(resource2));
+
+		assertNull(allocatedSlots.get(allocation1));
+		assertNull(allocatedSlots.get(allocation2));
+		assertNull(allocatedSlots.get(allocation3));
+		assertEquals(0, allocatedSlots.getSlotsForTaskManager(resource1).size());
+		assertEquals(0, allocatedSlots.getSlotsForTaskManager(resource2).size());
+		assertEquals(0, allocatedSlots.size());
+	}
+
+	private AllocatedSlot createSlot(final AllocationID allocationId, final TaskManagerLocation taskManagerLocation) {
+		return new AllocatedSlot(
+			allocationId,
+			taskManagerLocation,
+			0,
+			ResourceProfile.UNKNOWN,
+			new SimpleAckingTaskManagerGateway());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
new file mode 100644
index 0000000..4835c57
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AvailableSlotsTest extends TestLogger {
+
+	static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512);
+
+	static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new ResourceProfile(2.0, 1024);
+
+	@Test
+	public void testAddAndRemove() throws Exception {
+		SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
+
+		final ResourceID resource1 = new ResourceID("resource1");
+		final ResourceID resource2 = new ResourceID("resource2");
+
+		final AllocatedSlot slot1 = createAllocatedSlot(resource1);
+		final AllocatedSlot slot2 = createAllocatedSlot(resource1);
+		final AllocatedSlot slot3 = createAllocatedSlot(resource2);
+
+		availableSlots.add(slot1, 1L);
+		availableSlots.add(slot2, 2L);
+		availableSlots.add(slot3, 3L);
+
+		assertEquals(3, availableSlots.size());
+		assertTrue(availableSlots.contains(slot1.getAllocationId()));
+		assertTrue(availableSlots.contains(slot2.getAllocationId()));
+		assertTrue(availableSlots.contains(slot3.getAllocationId()));
+		assertTrue(availableSlots.containsTaskManager(resource1));
+		assertTrue(availableSlots.containsTaskManager(resource2));
+
+		availableSlots.removeAllForTaskManager(resource1);
+
+		assertEquals(1, availableSlots.size());
+		assertFalse(availableSlots.contains(slot1.getAllocationId()));
+		assertFalse(availableSlots.contains(slot2.getAllocationId()));
+		assertTrue(availableSlots.contains(slot3.getAllocationId()));
+		assertFalse(availableSlots.containsTaskManager(resource1));
+		assertTrue(availableSlots.containsTaskManager(resource2));
+
+		availableSlots.removeAllForTaskManager(resource2);
+
+		assertEquals(0, availableSlots.size());
+		assertFalse(availableSlots.contains(slot1.getAllocationId()));
+		assertFalse(availableSlots.contains(slot2.getAllocationId()));
+		assertFalse(availableSlots.contains(slot3.getAllocationId()));
+		assertFalse(availableSlots.containsTaskManager(resource1));
+		assertFalse(availableSlots.containsTaskManager(resource2));
+	}
+
+	@Test
+	public void testPollFreeSlot() {
+		SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
+
+		final ResourceID resource1 = new ResourceID("resource1");
+		final AllocatedSlot slot1 = createAllocatedSlot(resource1);
+
+		availableSlots.add(slot1, 1L);
+
+		assertEquals(1, availableSlots.size());
+		assertTrue(availableSlots.contains(slot1.getAllocationId()));
+		assertTrue(availableSlots.containsTaskManager(resource1));
+
+		assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE, null));
+
+		SlotAndLocality slotAndLocality = availableSlots.poll(DEFAULT_TESTING_PROFILE, null);
+		assertEquals(slot1, slotAndLocality.getSlot());
+		assertEquals(0, availableSlots.size());
+		assertFalse(availableSlots.contains(slot1.getAllocationId()));
+		assertFalse(availableSlots.containsTaskManager(resource1));
+	}
+
+	static AllocatedSlot createAllocatedSlot(final ResourceID resourceId) {
+		TaskManagerLocation mockTaskManagerLocation = mock(TaskManagerLocation.class);
+		when(mockTaskManagerLocation.getResourceID()).thenReturn(resourceId);
+
+		TaskManagerGateway mockTaskManagerGateway = mock(TaskManagerGateway.class);
+
+		return new AllocatedSlot(
+			new AllocationID(),
+			mockTaskManagerLocation,
+			0,
+			DEFAULT_TESTING_PROFILE,
+			mockTaskManagerGateway);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
new file mode 100644
index 0000000..7454e3e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link CoLocationConstraint} with the {@link SlotPool}.
+ */
+public class SlotPoolCoLocationTest extends SlotPoolSchedulingTestBase {
+
+	/**
+	 * Tests the scheduling of two tasks with a parallelism of 2 and a co-location constraint.
+	 */
+	@Test
+	public void testSimpleCoLocatedSlotScheduling() throws ExecutionException, InterruptedException {
+		final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
+
+		testingResourceManagerGateway.setRequestSlotConsumer(
+			(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+		slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+		CoLocationGroup group = new CoLocationGroup();
+		CoLocationConstraint coLocationConstraint1 = group.getLocationConstraint(0);
+		CoLocationConstraint coLocationConstraint2 = group.getLocationConstraint(1);
+
+		final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+
+		JobVertexID jobVertexId1 = new JobVertexID();
+		JobVertexID jobVertexId2 = new JobVertexID();
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture11 = slotProvider.allocateSlot(
+			new ScheduledUnit(
+				jobVertexId1,
+				slotSharingGroupId,
+				coLocationConstraint1),
+			true,
+			Collections.emptyList());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture22 = slotProvider.allocateSlot(
+			new ScheduledUnit(
+				jobVertexId2,
+				slotSharingGroupId,
+				coLocationConstraint2),
+			true,
+			Collections.emptyList());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture12 = slotProvider.allocateSlot(
+			new ScheduledUnit(
+				jobVertexId2,
+				slotSharingGroupId,
+				coLocationConstraint1),
+			true,
+			Collections.emptyList());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture21 = slotProvider.allocateSlot(
+			new ScheduledUnit(
+				jobVertexId1,
+				slotSharingGroupId,
+				coLocationConstraint2),
+			true,
+			Collections.emptyList());
+
+		final AllocationID allocationId1 = allocationIds.take();
+		final AllocationID allocationId2 = allocationIds.take();
+
+		CompletableFuture<Boolean> slotOfferFuture1 = slotPoolGateway.offerSlot(
+			taskManagerLocation,
+			new SimpleAckingTaskManagerGateway(),
+			new SlotOffer(
+				allocationId1,
+				0,
+				ResourceProfile.UNKNOWN));
+
+		CompletableFuture<Boolean> slotOfferFuture2 = slotPoolGateway.offerSlot(
+			taskManagerLocation,
+			new SimpleAckingTaskManagerGateway(),
+			new SlotOffer(
+				allocationId2,
+				0,
+				ResourceProfile.UNKNOWN));
+
+		assertTrue(slotOfferFuture1.get());
+		assertTrue(slotOfferFuture2.get());
+
+		LogicalSlot logicalSlot11 = logicalSlotFuture11.get();
+		LogicalSlot logicalSlot12 = logicalSlotFuture12.get();
+		LogicalSlot logicalSlot21 = logicalSlotFuture21.get();
+		LogicalSlot logicalSlot22 = logicalSlotFuture22.get();
+
+		assertEquals(logicalSlot11.getAllocationId(), logicalSlot12.getAllocationId());
+		assertEquals(logicalSlot21.getAllocationId(), logicalSlot22.getAllocationId());
+		assertNotEquals(logicalSlot11.getAllocationId(), logicalSlot21.getAllocationId());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
new file mode 100644
index 0000000..2d862c5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.clock.Clock;
+import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import akka.pattern.AskTimeoutException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the SlotPool using a proper RPC setup.
+ */
+public class SlotPoolRpcTest extends TestLogger {
+
+	private static RpcService rpcService;
+
+	private static final Time timeout = Time.seconds(10L);
+
+	// ------------------------------------------------------------------------
+	//  setup
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void setup() {
+		ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+		rpcService = new AkkaRpcService(actorSystem, Time.seconds(10));
+	}
+
+	@AfterClass
+	public static  void shutdown() {
+		if (rpcService != null) {
+			rpcService.stopService();
+			rpcService = null;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testSlotAllocationNoResourceManager() throws Exception {
+		final JobID jid = new JobID();
+		
+		final SlotPool pool = new SlotPool(
+			rpcService,
+			jid,
+			SystemClock.getInstance(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			Time.milliseconds(10L) // this is the timeout for the request tested here
+		);
+
+		try {
+			pool.start(JobMasterId.generate(), "foobar");
+
+			CompletableFuture<LogicalSlot> future = pool.allocateSlot(
+				new SlotRequestId(),
+				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
+				DEFAULT_TESTING_PROFILE,
+				Collections.emptyList(),
+				true,
+				TestingUtils.infiniteTime());
+
+			try {
+				future.get();
+				fail("We expected an ExecutionException.");
+			} catch (ExecutionException e) {
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof NoResourceAvailableException);
+			}
+		} finally {
+			RpcUtils.terminateRpcEndpoint(pool, timeout);
+		}
+	}
+
+	@Test
+	public void testCancelSlotAllocationWithoutResourceManager() throws Exception {
+		final JobID jid = new JobID();
+
+		final TestingSlotPool pool = new TestingSlotPool(
+			rpcService,
+			jid,
+			SystemClock.getInstance(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime());
+
+		try {
+			pool.start(JobMasterId.generate(), "foobar");
+			SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
+
+			SlotRequestId requestId = new SlotRequestId();
+			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
+				requestId,
+				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
+				DEFAULT_TESTING_PROFILE,
+				Collections.emptyList(),
+				true,
+				Time.milliseconds(10L));
+
+			try {
+				future.get();
+				fail("We expected a AskTimeoutException.");
+			} catch (ExecutionException e) {
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+			}
+
+			assertEquals(1L, (long) pool.getNumberOfWaitingForResourceRequests().get());
+
+			slotPoolGateway.releaseSlot(requestId, null, null).get();
+
+			assertEquals(0L, (long) pool.getNumberOfWaitingForResourceRequests().get());
+		} finally {
+			RpcUtils.terminateRpcEndpoint(pool, timeout);
+		}
+	}
+
+	@Test
+	public void testCancelSlotAllocationWithResourceManager() throws Exception {
+		final JobID jid = new JobID();
+
+		final TestingSlotPool pool = new TestingSlotPool(
+			rpcService,
+			jid,
+			SystemClock.getInstance(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime());
+
+		try {
+			pool.start(JobMasterId.generate(), "foobar");
+			SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
+
+			ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+			pool.connectToResourceManager(resourceManagerGateway);
+
+			SlotRequestId requestId = new SlotRequestId();
+			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
+				requestId,
+				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
+				DEFAULT_TESTING_PROFILE,
+				Collections.emptyList(),
+				true,
+				Time.milliseconds(10L));
+
+			try {
+				future.get();
+				fail("We expected a AskTimeoutException.");
+			} catch (ExecutionException e) {
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+			}
+
+			assertEquals(1L, (long) pool.getNumberOfPendingRequests().get());
+
+			slotPoolGateway.releaseSlot(requestId, null, null).get();
+			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
+		} finally {
+			RpcUtils.terminateRpcEndpoint(pool, timeout);
+		}
+	}
+
+	/**
+	 * Tests that allocated slots are not cancelled.
+	 */
+	@Test
+	public void testCancelSlotAllocationWhileSlotFulfilled() throws Exception {
+		final JobID jid = new JobID();
+
+		final TestingSlotPool pool = new TestingSlotPool(
+			rpcService,
+			jid,
+			SystemClock.getInstance(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime());
+
+		try {
+			pool.start(JobMasterId.generate(), "foobar");
+			SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
+
+			final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
+
+			TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+			resourceManagerGateway.setRequestSlotConsumer(
+				(SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+			pool.connectToResourceManager(resourceManagerGateway);
+
+			SlotRequestId requestId = new SlotRequestId();
+			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
+				requestId,
+				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
+				DEFAULT_TESTING_PROFILE,
+				Collections.emptyList(),
+				true,
+				Time.milliseconds(10L));
+
+			try {
+				future.get();
+				fail("We expected a AskTimeoutException.");
+			} catch (ExecutionException e) {
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+			}
+
+			AllocationID allocationId = allocationIdFuture.get();
+			final SlotOffer slotOffer = new SlotOffer(
+				allocationId,
+				0,
+				DEFAULT_TESTING_PROFILE);
+			final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+			final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
+
+			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
+
+			assertTrue(pool.containsAllocatedSlot(allocationId).get());
+
+			pool.releaseSlot(requestId, null, null).get();
+
+			assertFalse(pool.containsAllocatedSlot(allocationId).get());
+			assertTrue(pool.containsAvailableSlot(allocationId).get());
+		} finally {
+			RpcUtils.terminateRpcEndpoint(pool, timeout);
+		}
+	}
+
+	/**
+	 * This case make sure when allocateSlot in ProviderAndOwner timeout,
+	 * it will automatically call cancelSlotAllocation as will inject future.whenComplete in ProviderAndOwner.
+	 */
+	@Test
+	public void testProviderAndOwner() throws Exception {
+		final JobID jid = new JobID();
+
+		final TestingSlotPool pool = new TestingSlotPool(
+			rpcService,
+			jid,
+			SystemClock.getInstance(),
+			Time.milliseconds(10L),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime());
+
+		final CompletableFuture<SlotRequestId> releaseSlotFuture = new CompletableFuture<>();
+
+		pool.setReleaseSlotConsumer(
+			slotRequestID -> releaseSlotFuture.complete(slotRequestID));
+
+		try {
+			pool.start(JobMasterId.generate(), "foobar");
+			ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+			pool.connectToResourceManager(resourceManagerGateway);
+
+			ScheduledUnit mockScheduledUnit = new ScheduledUnit(SchedulerTestUtils.getDummyTask());
+
+			// test the pending request is clear when timed out
+			CompletableFuture<LogicalSlot> future = pool.getSlotProvider().allocateSlot(
+				mockScheduledUnit,
+				true,
+				Collections.emptyList());
+
+			try {
+				future.get();
+				fail("We expected a AskTimeoutException.");
+			} catch (ExecutionException e) {
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+			}
+
+			// wait for the cancel call on the SlotPool
+			releaseSlotFuture.get();
+
+			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
+		} finally {
+			RpcUtils.terminateRpcEndpoint(pool, timeout);
+		}
+	}
+
+	/**
+	 * Testing SlotPool which exposes internal state via some testing methods.
+	 */
+	private static final class TestingSlotPool extends SlotPool {
+
+		private volatile Consumer<SlotRequestId> releaseSlotConsumer;
+
+		public TestingSlotPool(
+				RpcService rpcService,
+				JobID jobId,
+				Clock clock,
+				Time slotRequestTimeout,
+				Time resourceManagerAllocationTimeout,
+				Time resourceManagerRequestTimeout) {
+			super(
+				rpcService,
+				jobId,
+				clock,
+				slotRequestTimeout,
+				resourceManagerAllocationTimeout,
+				resourceManagerRequestTimeout);
+
+			releaseSlotConsumer = null;
+		}
+
+		public void setReleaseSlotConsumer(Consumer<SlotRequestId> releaseSlotConsumer) {
+			this.releaseSlotConsumer = Preconditions.checkNotNull(releaseSlotConsumer);
+		}
+
+		@Override
+		public CompletableFuture<Acknowledge> releaseSlot(
+			SlotRequestId slotRequestId,
+			@Nullable SlotSharingGroupId slotSharingGroupId,
+			@Nullable Throwable cause) {
+			final Consumer<SlotRequestId> currentReleaseSlotConsumer = releaseSlotConsumer;
+
+			if (currentReleaseSlotConsumer != null) {
+				currentReleaseSlotConsumer.accept(slotRequestId);
+			}
+
+			return super.releaseSlot(slotRequestId, slotSharingGroupId, cause);
+		}
+
+		CompletableFuture<Boolean> containsAllocatedSlot(AllocationID allocationId) {
+			return callAsync(
+				() -> getAllocatedSlots().contains(allocationId),
+				timeout);
+		}
+
+		CompletableFuture<Boolean> containsAvailableSlot(AllocationID allocationId) {
+			return callAsync(
+				() -> getAvailableSlots().contains(allocationId),
+				timeout);
+		}
+
+		CompletableFuture<Integer> getNumberOfPendingRequests() {
+			return callAsync(
+				() -> getPendingRequests().size(),
+				timeout);
+		}
+
+		CompletableFuture<Integer> getNumberOfWaitingForResourceRequests() {
+			return callAsync(
+				() -> getWaitingForResourceManager().size(),
+				timeout);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
new file mode 100644
index 0000000..31be1ae
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Test base for {@link SlotPool} related scheduling test cases.
+ */
+public class SlotPoolSchedulingTestBase extends TestLogger {
+
+	private static final JobID jobId = new JobID();
+
+	private static final JobMasterId jobMasterId = new JobMasterId();
+
+	private static final String jobMasterAddress = "foobar";
+
+	private static TestingRpcService testingRpcService;
+
+	protected SlotPool slotPool;
+
+	protected SlotPoolGateway slotPoolGateway;
+
+	protected SlotProvider slotProvider;
+
+	protected TestingResourceManagerGateway testingResourceManagerGateway;
+
+	@BeforeClass
+	public static void setup() {
+		testingRpcService = new TestingRpcService();
+	}
+
+	@AfterClass
+	public static void teardown() {
+		if (testingRpcService != null) {
+			testingRpcService.stopService();
+			testingRpcService = null;
+		}
+	}
+
+	@Before
+	public void setupBefore() throws Exception {
+		testingResourceManagerGateway = new TestingResourceManagerGateway();
+
+		slotPool = new SlotPool(
+			testingRpcService,
+			jobId);
+
+		slotPool.start(jobMasterId, jobMasterAddress);
+
+		slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
+
+		slotProvider = slotPool.getSlotProvider();
+
+		slotPool.connectToResourceManager(testingResourceManagerGateway);
+	}
+
+	@After
+	public void teardownAfter() throws InterruptedException, ExecutionException, TimeoutException {
+		if (slotPool != null) {
+			RpcUtils.terminateRpcEndpoint(slotPool, TestingUtils.TIMEOUT());
+			slotPool = null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
new file mode 100644
index 0000000..bb6c2b7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test cases for slot sharing with the {@link SlotPool}.
+ */
+public class SlotPoolSlotSharingTest extends SlotPoolSchedulingTestBase {
+
+	@Test
+	public void testSingleQueuedSharedSlotScheduling() throws Exception {
+		final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
+		testingResourceManagerGateway.setRequestSlotConsumer(
+			(SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+		LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+		slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+		SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+		CompletableFuture<LogicalSlot> logicalSlotFuture = slotProvider.allocateSlot(
+			new ScheduledUnit(
+				new JobVertexID(),
+				slotSharingGroupId,
+				null),
+			true,
+			Collections.emptyList());
+
+		assertFalse(logicalSlotFuture.isDone());
+
+		final AllocationID allocationId = allocationIdFuture.get();
+
+		CompletableFuture<Boolean> booleanCompletableFuture = slotPoolGateway.offerSlot(
+			taskManagerLocation,
+			new SimpleAckingTaskManagerGateway(),
+			new SlotOffer(
+				allocationId,
+				0,
+				ResourceProfile.UNKNOWN));
+
+		assertTrue(booleanCompletableFuture.get());
+
+		final LogicalSlot logicalSlot = logicalSlotFuture.get();
+
+		assertEquals(slotSharingGroupId, logicalSlot.getSlotSharingGroupId());
+	}
+
+	/**
+	 * Tests that returned slot futures are failed if the allocation request is failed.
+	 */
+	@Test
+	public void testFailingQueuedSharedSlotScheduling() throws ExecutionException, InterruptedException {
+		final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
+		testingResourceManagerGateway.setRequestSlotConsumer(
+			(SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture = slotProvider.allocateSlot(
+			new ScheduledUnit(
+				new JobVertexID(),
+				new SlotSharingGroupId(),
+				null),
+			true,
+			Collections.emptyList());
+
+		final AllocationID allocationId = allocationIdFuture.get();
+
+		// this should fail the returned logical slot future
+		slotPoolGateway.failAllocation(allocationId, new FlinkException("Testing Exception"));
+
+		try {
+			logicalSlotFuture.get();
+			fail("The slot future should have failed.");
+		} catch (ExecutionException ee) {
+			assertTrue(ExceptionUtils.findThrowable(ee, FlinkException.class).isPresent());
+		}
+	}
+
+	/**
+	 * Tests queued slot scheduling with a single slot sharing group
+	 */
+	@Test
+	public void testQueuedSharedSlotScheduling() throws InterruptedException, ExecutionException {
+		final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
+		testingResourceManagerGateway.setRequestSlotConsumer(
+			(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+		slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+		final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+		final JobVertexID jobVertexId1 = new JobVertexID();
+		final JobVertexID jobVertexId2 = new JobVertexID();
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
+			new ScheduledUnit(
+				jobVertexId1,
+				slotSharingGroupId,
+				null),
+			true,
+			Collections.emptyList());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
+			new ScheduledUnit(
+				jobVertexId2,
+				slotSharingGroupId,
+				null),
+			true,
+			Collections.emptyList());
+
+		assertFalse(logicalSlotFuture1.isDone());
+		assertFalse(logicalSlotFuture2.isDone());
+
+		final AllocationID allocationId1 = allocationIds.take();
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
+			new ScheduledUnit(
+				jobVertexId1,
+				slotSharingGroupId,
+				null),
+			true,
+			Collections.emptyList());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture4 = slotProvider.allocateSlot(
+			new ScheduledUnit(
+				jobVertexId2,
+				slotSharingGroupId,
+				null),
+			true,
+			Collections.emptyList());
+
+		assertFalse(logicalSlotFuture3.isDone());
+		assertFalse(logicalSlotFuture4.isDone());
+
+		final AllocationID allocationId2 = allocationIds.take();
+
+		// this should fulfill the first two slot futures
+		CompletableFuture<Boolean> offerFuture = slotPoolGateway.offerSlot(
+			taskManagerLocation,
+			new SimpleAckingTaskManagerGateway(),
+			new SlotOffer(
+				allocationId1,
+				0,
+				ResourceProfile.UNKNOWN));
+
+		assertTrue(offerFuture.get());
+
+		LogicalSlot logicalSlot1 = logicalSlotFuture1.get();
+		LogicalSlot logicalSlot2 = logicalSlotFuture2.get();
+
+		assertEquals(logicalSlot1.getTaskManagerLocation(), logicalSlot2.getTaskManagerLocation());
+		assertEquals(allocationId1, logicalSlot1.getAllocationId());
+		assertEquals(allocationId1, logicalSlot2.getAllocationId());
+
+		assertFalse(logicalSlotFuture3.isDone());
+		assertFalse(logicalSlotFuture4.isDone());
+
+		// release the shared slot by releasing the individual tasks
+		logicalSlot1.releaseSlot(null);
+		logicalSlot2.releaseSlot(null);
+
+		LogicalSlot logicalSlot3 = logicalSlotFuture3.get();
+		LogicalSlot logicalSlot4 = logicalSlotFuture4.get();
+
+		assertEquals(logicalSlot3.getTaskManagerLocation(), logicalSlot4.getTaskManagerLocation());
+		assertEquals(allocationId1, logicalSlot3.getAllocationId());
+		assertEquals(allocationId1, logicalSlot4.getAllocationId());
+	}
+
+	/**
+	 * Tests queued slot scheduling with multiple slot sharing groups.
+	 */
+	@Test
+	public void testQueuedMultipleSlotSharingGroups() throws ExecutionException, InterruptedException {
+		final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(4);
+
+		testingResourceManagerGateway.setRequestSlotConsumer(
+			(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+		final SlotSharingGroupId slotSharingGroupId1 = new SlotSharingGroupId();
+		final SlotSharingGroupId slotSharingGroupId2 = new SlotSharingGroupId();
+		final JobVertexID jobVertexId1 = new JobVertexID();
+		final JobVertexID jobVertexId2 = new JobVertexID();
+		final JobVertexID jobVertexId3 = new JobVertexID();
+		final JobVertexID jobVertexId4 = new JobVertexID();
+
+		slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
+			new ScheduledUnit(
+				jobVertexId1,
+				slotSharingGroupId1,
+				null),
+			true,
+			Collections.emptyList());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
+			new ScheduledUnit(
+				jobVertexId2,
+				slotSharingGroupId1,
+				null),
+			true,
+			Collections.emptyList());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
+			new ScheduledUnit(
+				jobVertexId3,
+				slotSharingGroupId2,
+				null),
+			true,
+			Collections.emptyList());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture4 = slotProvider.allocateSlot(
+			new ScheduledUnit(
+				jobVertexId4,
+				slotSharingGroupId2,
+				null),
+			true,
+			Collections.emptyList());
+
+		assertFalse(logicalSlotFuture1.isDone());
+		assertFalse(logicalSlotFuture2.isDone());
+		assertFalse(logicalSlotFuture3.isDone());
+		assertFalse(logicalSlotFuture4.isDone());
+
+		// we expect two slot requests
+		final AllocationID allocationId1 = allocationIds.take();
+		final AllocationID allocationId2 = allocationIds.take();
+
+		CompletableFuture<Boolean> offerFuture1 = slotPoolGateway.offerSlot(
+			taskManagerLocation,
+			new SimpleAckingTaskManagerGateway(),
+			new SlotOffer(
+				allocationId1,
+				0,
+				ResourceProfile.UNKNOWN));
+
+		CompletableFuture<Boolean> offerFuture2 = slotPoolGateway.offerSlot(
+			taskManagerLocation,
+			new SimpleAckingTaskManagerGateway(),
+			new SlotOffer(
+				allocationId2,
+				0,
+				ResourceProfile.UNKNOWN));
+
+		assertTrue(offerFuture1.get());
+		assertTrue(offerFuture2.get());
+
+		LogicalSlot logicalSlot1 = logicalSlotFuture1.get();
+		LogicalSlot logicalSlot2 = logicalSlotFuture2.get();
+		LogicalSlot logicalSlot3 = logicalSlotFuture3.get();
+		LogicalSlot logicalSlot4 = logicalSlotFuture4.get();
+
+		assertEquals(logicalSlot1.getTaskManagerLocation(), logicalSlot2.getTaskManagerLocation());
+		assertEquals(logicalSlot3.getTaskManagerLocation(), logicalSlot4.getTaskManagerLocation());
+
+		assertEquals(allocationId1, logicalSlot1.getAllocationId());
+		assertEquals(allocationId2, logicalSlot3.getAllocationId());
+	}
+
+}