You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 15:33:36 UTC

[2/6] flink git commit: [FLINK-8087] Decouple Slot from AllocatedSlot

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 2e6558a..586f51b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.LogicalSlot;
@@ -39,7 +38,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
@@ -285,7 +284,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(parallelism);
 		final TestingSlotOwner slotOwner = new TestingSlotOwner();
 		slotOwner.setReturnAllocatedSlotConsumer(
-			(Slot slot) -> returnedSlots.offer(slot.getAllocatedSlot().getSlotAllocationId()));
+			(Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId()));
 
 		final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
 		final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
@@ -366,7 +365,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(2);
 		final TestingSlotOwner slotOwner = new TestingSlotOwner();
 		slotOwner.setReturnAllocatedSlotConsumer(
-			(Slot slot) -> returnedSlots.offer(slot.getAllocatedSlot().getSlotAllocationId()));
+			(Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId()));
 
 		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
 		final SimpleSlot[] slots = new SimpleSlot[parallelism];
@@ -448,8 +447,11 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		TaskManagerLocation location = new TaskManagerLocation(
 				ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345);
 
-		AllocatedSlot slot = new AllocatedSlot(
-				new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, taskManager);
+		SimpleSlotContext slot = new SimpleSlotContext(
+			new AllocationID(),
+			location,
+			0,
+			taskManager);
 
 		return new SimpleSlot(slot, slotOwner, 0);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
index 4ce3f9d..3c8d994 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
@@ -111,28 +111,28 @@ public class ExecutionGraphStopTest extends TestLogger {
 
 		// deploy source 1
 		for (ExecutionVertex ev : eg.getJobVertex(source1.getID()).getTaskVertices()) {
-			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
+			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(sourceGateway);
 			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
 			ev.getCurrentExecutionAttempt().deploy();
 		}
 
 		// deploy source 2
 		for (ExecutionVertex ev : eg.getJobVertex(source2.getID()).getTaskVertices()) {
-			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
+			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(sourceGateway);
 			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
 			ev.getCurrentExecutionAttempt().deploy();
 		}
 
 		// deploy non-source 1
 		for (ExecutionVertex ev : eg.getJobVertex(nonSource1.getID()).getTaskVertices()) {
-			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
+			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(nonSourceGateway);
 			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
 			ev.getCurrentExecutionAttempt().deploy();
 		}
 
 		// deploy non-source 2
 		for (ExecutionVertex ev : eg.getJobVertex(nonSource2.getID()).getTaskVertices()) {
-			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
+			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(nonSourceGateway);
 			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
 			ev.getCurrentExecutionAttempt().deploy();
 		}
@@ -164,7 +164,7 @@ public class ExecutionGraphStopTest extends TestLogger {
 		when(gateway.stopTask(any(ExecutionAttemptID.class), any(Time.class)))
 				.thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
-		final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway);
+		final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(gateway);
 
 		exec.tryAssignResource(slot);
 		exec.deploy();

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 42a63ec..06ffaa0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
@@ -49,7 +48,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -240,19 +239,20 @@ public class ExecutionGraphTestUtils {
 	//  Mocking Slots
 	// ------------------------------------------------------------------------
 
-	public static SimpleSlot createMockSimpleSlot(JobID jid, TaskManagerGateway gateway) {
+	public static SimpleSlot createMockSimpleSlot(TaskManagerGateway gateway) {
 		final TaskManagerLocation location = new TaskManagerLocation(
 				ResourceID.generate(), InetAddress.getLoopbackAddress(), 6572);
 
-		final AllocatedSlot allocatedSlot = new AllocatedSlot(
+		final SimpleSlotContext allocatedSlot = new SimpleSlotContext(
 				new AllocationID(),
-				jid,
 				location,
 				0,
-				ResourceProfile.UNKNOWN,
 				gateway);
 
-		return new SimpleSlot(allocatedSlot, mock(SlotOwner.class), 0);
+		return new SimpleSlot(
+			allocatedSlot,
+			mock(SlotOwner.class),
+			0);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index c6fb836..71d6f51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -81,7 +81,6 @@ public class ExecutionTest extends TestLogger {
 		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
 		final SimpleSlot slot = new SimpleSlot(
-			new JobID(),
 			slotOwner,
 			new LocalTaskManagerLocation(),
 			0,
@@ -121,7 +120,6 @@ public class ExecutionTest extends TestLogger {
 		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
 		final SimpleSlot slot = new SimpleSlot(
-			new JobID(),
 			slotOwner,
 			new LocalTaskManagerLocation(),
 			0,
@@ -171,7 +169,6 @@ public class ExecutionTest extends TestLogger {
 		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
 		final SimpleSlot slot = new SimpleSlot(
-			new JobID(),
 			slotOwner,
 			new LocalTaskManagerLocation(),
 			0,
@@ -285,11 +282,12 @@ public class ExecutionTest extends TestLogger {
 		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
 		final SimpleSlot slot = new SimpleSlot(
-			new JobID(),
 			slotOwner,
 			new LocalTaskManagerLocation(),
 			0,
-			new SimpleAckingTaskManagerGateway());
+			new SimpleAckingTaskManagerGateway(),
+			null,
+			null);
 
 		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
 		slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 1b8daca..cd613f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -18,36 +18,42 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-import java.io.IOException;
-
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.BaseTestingActorGateway;
 import org.apache.flink.runtime.instance.DummyActorGateway;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
+import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.io.IOException;
+
 import scala.concurrent.ExecutionContext;
 
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexResource;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
 @SuppressWarnings("serial")
 public class ExecutionVertexCancelTest extends TestLogger {
 
@@ -134,7 +140,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 					executionContext, 2);
 
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot();
 
 			vertex.deployToSlot(slot);
 
@@ -202,7 +208,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 					2);
 
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot();
 
 			vertex.deployToSlot(slot);
 
@@ -262,7 +268,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 					1);
 
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot();
 
 			setVertexResource(vertex, slot);
 			setVertexState(vertex, ExecutionState.RUNNING);
@@ -302,7 +308,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 					1);
 
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot();
 
 			setVertexResource(vertex, slot);
 			setVertexState(vertex, ExecutionState.RUNNING);
@@ -350,7 +356,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 					1);
 
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot();
 
 			setVertexResource(vertex, slot);
 			setVertexState(vertex, ExecutionState.RUNNING);
@@ -383,7 +389,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			final ActorGateway gateway = new CancelSequenceActorGateway(TestingUtils.directExecutionContext(), 0);
 
 			Instance instance = getInstance(new ActorTaskManagerGateway(gateway));
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot = instance.allocateSimpleSlot();
 
 			setVertexResource(vertex, slot);
 			setVertexState(vertex, ExecutionState.RUNNING);
@@ -458,7 +464,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			// the scheduler (or any caller) needs to know that the slot should be released
 			try {
 				Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
-				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+				SimpleSlot slot = instance.allocateSimpleSlot();
 
 				vertex.deployToSlot(slot);
 				fail("Method should throw an exception");
@@ -501,7 +507,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 				setVertexState(vertex, ExecutionState.CANCELING);
 
 				Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
-				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+				SimpleSlot slot = instance.allocateSimpleSlot();
 
 				vertex.deployToSlot(slot);
 				fail("Method should throw an exception");
@@ -517,7 +523,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 						AkkaUtils.getDefaultTimeout());
 
 				Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
-				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+				SimpleSlot slot = instance.allocateSimpleSlot();
 
 				setVertexResource(vertex, slot);
 				setVertexState(vertex, ExecutionState.CANCELING);

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 973c7d4..7f97d12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.TestLogger;
@@ -67,7 +67,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			Instance instance = getInstance(
 				new ActorTaskManagerGateway(
 					new SimpleActorGateway(TestingUtils.directExecutionContext())));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 				AkkaUtils.getDefaultTimeout());
@@ -104,7 +104,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			final Instance instance = getInstance(
 				new ActorTaskManagerGateway(
 					new SimpleActorGateway(TestingUtils.directExecutionContext())));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 				AkkaUtils.getDefaultTimeout());
@@ -146,7 +146,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			final Instance instance = getInstance(
 				new ActorTaskManagerGateway(
 					new SimpleActorGateway(TestingUtils.defaultExecutionContext())));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
@@ -191,7 +191,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			final Instance instance = getInstance(
 				new ActorTaskManagerGateway(
 					new SimpleFailingActorGateway(TestingUtils.directExecutionContext())));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
@@ -221,7 +221,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			final Instance instance = getInstance(
 				new ActorTaskManagerGateway(
 					new SimpleFailingActorGateway(TestingUtils.directExecutionContext())));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
@@ -265,7 +265,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			final Instance instance = getInstance(
 				new ActorTaskManagerGateway(
 					new SimpleActorGateway(TestingUtils.directExecutionContext())));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			vertex.deployToSlot(slot);
@@ -310,7 +310,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 						context,
 						2)));
 
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
@@ -372,7 +372,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 		result.getPartitions()[0].addConsumer(mockEdge, 0);
 
 		AllocatedSlot allocatedSlot = mock(AllocatedSlot.class);
-		when(allocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID());
+		when(allocatedSlot.getAllocationId()).thenReturn(new AllocationID());
 
 		LogicalSlot slot = mock(LogicalSlot.class);
 		when(slot.getAllocationId()).thenReturn(new AllocationID());

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 15d021a..98f7259 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -37,7 +36,8 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -233,8 +233,8 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 		//  - mocking the scheduler created fragile tests that break whenever the scheduler is adjusted
 		//  - exposing test methods in the ExecutionVertex leads to undesirable setters 
 
-		AllocatedSlot slot = new AllocatedSlot(
-				new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class));
+		SlotContext slot = new SimpleSlotContext(
+				new AllocationID(), location, 0, mock(TaskManagerGateway.class));
 
 		SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 2941739..9310912 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -57,7 +57,7 @@ public class ExecutionVertexSchedulingTest {
 
 			// a slot than cannot be deployed to
 			final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 			
 			slot.releaseInstanceSlot();
 			assertTrue(slot.isReleased());
@@ -89,7 +89,7 @@ public class ExecutionVertexSchedulingTest {
 
 			// a slot than cannot be deployed to
 			final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			slot.releaseInstanceSlot();
 			assertTrue(slot.isReleased());
@@ -126,7 +126,7 @@ public class ExecutionVertexSchedulingTest {
 
 			final Instance instance = getInstance(new ActorTaskManagerGateway(
 				new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext())));
-			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			Scheduler scheduler = mock(Scheduler.class);
 			CompletableFuture<SimpleSlot> future = new CompletableFuture<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index 14e0e66..9a19d24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph.utils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -29,7 +28,8 @@ import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -47,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 
-	private final ArrayDeque<AllocatedSlot> slots;
+	private final ArrayDeque<SlotContext> slots;
 
 	public SimpleSlotProvider(JobID jobId, int numSlots) {
 		this(jobId, numSlots, new SimpleAckingTaskManagerGateway());
@@ -60,12 +60,10 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 		this.slots = new ArrayDeque<>(numSlots);
 
 		for (int i = 0; i < numSlots; i++) {
-			AllocatedSlot as = new AllocatedSlot(
+			SimpleSlotContext as = new SimpleSlotContext(
 					new AllocationID(),
-					jobId,
 					new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i),
 					0,
-					ResourceProfile.UNKNOWN,
 					taskManagerGateway);
 			slots.add(as);
 		}
@@ -76,7 +74,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 			ScheduledUnit task,
 			boolean allowQueued,
 			Collection<TaskManagerLocation> preferredLocations) {
-		final AllocatedSlot slot;
+		final SlotContext slot;
 
 		synchronized (slots) {
 			if (slots.isEmpty()) {
@@ -98,7 +96,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 	@Override
 	public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
 		synchronized (slots) {
-			slots.add(slot.getAllocatedSlot());
+			slots.add(slot.getSlotContext());
 		}
 		return CompletableFuture.completedFuture(true);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
index 0e4bfc0..bc396c1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
@@ -20,7 +20,12 @@ package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -28,10 +33,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
-public class AllocatedSlotsTest {
+public class AllocatedSlotsTest extends TestLogger {
 
 	@Test
 	public void testOperations() throws Exception {
@@ -39,12 +42,13 @@ public class AllocatedSlotsTest {
 
 		final AllocationID allocation1 = new AllocationID();
 		final SlotPoolGateway.SlotRequestID slotRequestID = new SlotPoolGateway.SlotRequestID();
-		final ResourceID resource1 = new ResourceID("resource1");
-		final Slot slot1 = createSlot(resource1, allocation1);
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+		final ResourceID resource1 = taskManagerLocation.getResourceID();
+		final AllocatedSlot slot1 = createSlot(allocation1, taskManagerLocation);
 
 		allocatedSlots.add(slotRequestID, slot1);
 
-		assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
 		assertTrue(allocatedSlots.containResource(resource1));
 
 		assertEquals(slot1, allocatedSlots.get(allocation1));
@@ -53,12 +57,12 @@ public class AllocatedSlotsTest {
 
 		final AllocationID allocation2 = new AllocationID();
 		final SlotPoolGateway.SlotRequestID slotRequestID2 = new SlotPoolGateway.SlotRequestID();
-		final Slot slot2 = createSlot(resource1, allocation2);
+		final AllocatedSlot slot2 = createSlot(allocation2, taskManagerLocation);
 
 		allocatedSlots.add(slotRequestID2, slot2);
 
-		assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
-		assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+		assertTrue(allocatedSlots.contains(slot2.getAllocationId()));
 		assertTrue(allocatedSlots.containResource(resource1));
 
 		assertEquals(slot1, allocatedSlots.get(allocation1));
@@ -68,14 +72,15 @@ public class AllocatedSlotsTest {
 
 		final AllocationID allocation3 = new AllocationID();
 		final SlotPoolGateway.SlotRequestID slotRequestID3 = new SlotPoolGateway.SlotRequestID();
-		final ResourceID resource2 = new ResourceID("resource2");
-		final Slot slot3 = createSlot(resource2, allocation3);
+		final TaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
+		final ResourceID resource2 = taskManagerLocation2.getResourceID();
+		final AllocatedSlot slot3 = createSlot(allocation3, taskManagerLocation2);
 
 		allocatedSlots.add(slotRequestID3, slot3);
 
-		assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
-		assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
-		assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+		assertTrue(allocatedSlots.contains(slot2.getAllocationId()));
+		assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
 		assertTrue(allocatedSlots.containResource(resource1));
 		assertTrue(allocatedSlots.containResource(resource2));
 
@@ -86,11 +91,11 @@ public class AllocatedSlotsTest {
 		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
 		assertEquals(3, allocatedSlots.size());
 
-		allocatedSlots.remove(slot2);
+		allocatedSlots.remove(slot2.getAllocationId());
 
-		assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
-		assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
-		assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+		assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
+		assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
 		assertTrue(allocatedSlots.containResource(resource1));
 		assertTrue(allocatedSlots.containResource(resource2));
 
@@ -101,11 +106,11 @@ public class AllocatedSlotsTest {
 		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
 		assertEquals(2, allocatedSlots.size());
 
-		allocatedSlots.remove(slot1);
+		allocatedSlots.remove(slot1.getAllocationId());
 
-		assertFalse(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
-		assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
-		assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+		assertFalse(allocatedSlots.contains(slot1.getAllocationId()));
+		assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
+		assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
 		assertFalse(allocatedSlots.containResource(resource1));
 		assertTrue(allocatedSlots.containResource(resource2));
 
@@ -116,11 +121,11 @@ public class AllocatedSlotsTest {
 		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
 		assertEquals(1, allocatedSlots.size());
 
-		allocatedSlots.remove(slot3);
+		allocatedSlots.remove(slot3.getAllocationId());
 
-		assertFalse(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
-		assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
-		assertFalse(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+		assertFalse(allocatedSlots.contains(slot1.getAllocationId()));
+		assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
+		assertFalse(allocatedSlots.contains(slot3.getAllocationId()));
 		assertFalse(allocatedSlots.containResource(resource1));
 		assertFalse(allocatedSlots.containResource(resource2));
 
@@ -132,13 +137,13 @@ public class AllocatedSlotsTest {
 		assertEquals(0, allocatedSlots.size());
 	}
 
-	private Slot createSlot(final ResourceID resourceId, final AllocationID allocationId) {
-		AllocatedSlot mockAllocatedSlot = mock(AllocatedSlot.class);
-		Slot slot = mock(Slot.class);
-		when(slot.getTaskManagerID()).thenReturn(resourceId);
-		when(slot.getAllocatedSlot()).thenReturn(mockAllocatedSlot);
-
-		when(mockAllocatedSlot.getSlotAllocationId()).thenReturn(allocationId);
-		return slot;
+	private AllocatedSlot createSlot(final AllocationID allocationId, final TaskManagerLocation taskManagerLocation) {
+		return new AllocatedSlot(
+			allocationId,
+			taskManagerLocation,
+			0,
+			ResourceProfile.UNKNOWN,
+			new SimpleAckingTaskManagerGateway(),
+			new DummySlotOwner());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
index 4ed88c4..9ede899 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -35,7 +36,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class AvailableSlotsTest {
+public class AvailableSlotsTest extends TestLogger {
 
 	static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512);
 
@@ -57,27 +58,27 @@ public class AvailableSlotsTest {
 		availableSlots.add(slot3, 3L);
 
 		assertEquals(3, availableSlots.size());
-		assertTrue(availableSlots.contains(slot1.getSlotAllocationId()));
-		assertTrue(availableSlots.contains(slot2.getSlotAllocationId()));
-		assertTrue(availableSlots.contains(slot3.getSlotAllocationId()));
+		assertTrue(availableSlots.contains(slot1.getAllocationId()));
+		assertTrue(availableSlots.contains(slot2.getAllocationId()));
+		assertTrue(availableSlots.contains(slot3.getAllocationId()));
 		assertTrue(availableSlots.containsTaskManager(resource1));
 		assertTrue(availableSlots.containsTaskManager(resource2));
 
 		availableSlots.removeAllForTaskManager(resource1);
 
 		assertEquals(1, availableSlots.size());
-		assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
-		assertFalse(availableSlots.contains(slot2.getSlotAllocationId()));
-		assertTrue(availableSlots.contains(slot3.getSlotAllocationId()));
+		assertFalse(availableSlots.contains(slot1.getAllocationId()));
+		assertFalse(availableSlots.contains(slot2.getAllocationId()));
+		assertTrue(availableSlots.contains(slot3.getAllocationId()));
 		assertFalse(availableSlots.containsTaskManager(resource1));
 		assertTrue(availableSlots.containsTaskManager(resource2));
 
 		availableSlots.removeAllForTaskManager(resource2);
 
 		assertEquals(0, availableSlots.size());
-		assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
-		assertFalse(availableSlots.contains(slot2.getSlotAllocationId()));
-		assertFalse(availableSlots.contains(slot3.getSlotAllocationId()));
+		assertFalse(availableSlots.contains(slot1.getAllocationId()));
+		assertFalse(availableSlots.contains(slot2.getAllocationId()));
+		assertFalse(availableSlots.contains(slot3.getAllocationId()));
 		assertFalse(availableSlots.containsTaskManager(resource1));
 		assertFalse(availableSlots.containsTaskManager(resource2));
 	}
@@ -92,7 +93,7 @@ public class AvailableSlotsTest {
 		availableSlots.add(slot1, 1L);
 
 		assertEquals(1, availableSlots.size());
-		assertTrue(availableSlots.contains(slot1.getSlotAllocationId()));
+		assertTrue(availableSlots.contains(slot1.getAllocationId()));
 		assertTrue(availableSlots.containsTaskManager(resource1));
 
 		assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE, null));
@@ -100,7 +101,7 @@ public class AvailableSlotsTest {
 		SlotAndLocality slotAndLocality = availableSlots.poll(DEFAULT_TESTING_PROFILE, null);
 		assertEquals(slot1, slotAndLocality.slot());
 		assertEquals(0, availableSlots.size());
-		assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
+		assertFalse(availableSlots.contains(slot1.getAllocationId()));
 		assertFalse(availableSlots.containsTaskManager(resource1));
 	}
 
@@ -112,10 +113,10 @@ public class AvailableSlotsTest {
 
 		return new AllocatedSlot(
 			new AllocationID(),
-			new JobID(),
 			mockTaskManagerLocation,
 			0,
 			DEFAULT_TESTING_PROFILE,
-			mockTaskManagerGateway);
+			mockTaskManagerGateway,
+			new DummySlotOwner());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 5b85f72..229237d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -18,17 +18,22 @@
 
 package org.apache.flink.runtime.instance;
 
-import static org.junit.Assert.*;
-
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
 import org.junit.Test;
 
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * Tests for the {@link Instance} class.
  */
@@ -53,10 +58,10 @@ public class InstanceTest {
 			assertEquals(4, instance.getNumberOfAvailableSlots());
 			assertEquals(0, instance.getNumberOfAllocatedSlots());
 
-			SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
-			SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
-			SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
-			SimpleSlot slot4 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot1 = instance.allocateSimpleSlot();
+			SimpleSlot slot2 = instance.allocateSimpleSlot();
+			SimpleSlot slot3 = instance.allocateSimpleSlot();
+			SimpleSlot slot4 = instance.allocateSimpleSlot();
 
 			assertNotNull(slot1);
 			assertNotNull(slot2);
@@ -69,7 +74,7 @@ public class InstanceTest {
 					slot3.getSlotNumber() + slot4.getSlotNumber());
 
 			// no more slots
-			assertNull(instance.allocateSimpleSlot(new JobID()));
+			assertNull(instance.allocateSimpleSlot());
 			try {
 				instance.returnAllocatedSlot(slot2);
 				fail("instance accepted a non-cancelled slot.");
@@ -118,9 +123,9 @@ public class InstanceTest {
 
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 
-			SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
-			SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
-			SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot1 = instance.allocateSimpleSlot();
+			SimpleSlot slot2 = instance.allocateSimpleSlot();
+			SimpleSlot slot3 = instance.allocateSimpleSlot();
 
 			instance.markDead();
 
@@ -154,9 +159,9 @@ public class InstanceTest {
 
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 
-			SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
-			SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
-			SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
+			SimpleSlot slot1 = instance.allocateSimpleSlot();
+			SimpleSlot slot2 = instance.allocateSimpleSlot();
+			SimpleSlot slot3 = instance.allocateSimpleSlot();
 
 			instance.cancelAndReleaseAllSlots();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
index 1e2b6af..5104e48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
@@ -34,7 +33,12 @@ import org.junit.Test;
 
 import java.util.Collections;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the allocation, properties, and release of shared slots.
@@ -46,7 +50,6 @@ public class SharedSlotsTest extends TestLogger {
 	@Test
 	public void allocateAndReleaseEmptySlot() {
 		try {
-			JobID jobId = new JobID();
 			JobVertexID vertexId = new JobVertexID();
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(vertexId);
@@ -62,7 +65,7 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(2, instance.getNumberOfAvailableSlots());
 			
 			// allocate a shared slot
-			SharedSlot slot = instance.allocateSharedSlot(jobId, assignment);
+			SharedSlot slot = instance.allocateSharedSlot(assignment);
 			assertEquals(2, instance.getTotalNumberOfSlots());
 			assertEquals(1, instance.getNumberOfAllocatedSlots());
 			assertEquals(1, instance.getNumberOfAvailableSlots());
@@ -110,7 +113,6 @@ public class SharedSlotsTest extends TestLogger {
 	@Test
 	public void allocateSimpleSlotsAndReleaseFromRoot() {
 		try {
-			JobID jobId = new JobID();
 			JobVertexID vid1 = new JobVertexID();
 			JobVertexID vid2 = new JobVertexID();
 			JobVertexID vid3 = new JobVertexID();
@@ -122,7 +124,7 @@ public class SharedSlotsTest extends TestLogger {
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 
 			// allocate a shared slot
-			SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
+			SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
 
 			// allocate a series of sub slots
 
@@ -134,7 +136,6 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(1, sub1.getNumberLeaves());
 			assertEquals(vid1, sub1.getGroupID());
 			assertEquals(instance.getTaskManagerID(), sub1.getTaskManagerID());
-			assertEquals(jobId, sub1.getJobID());
 			assertEquals(sharedSlot, sub1.getParent());
 			assertEquals(sharedSlot, sub1.getRoot());
 			assertEquals(0, sub1.getRootSlotNumber());
@@ -153,7 +154,6 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(1, sub2.getNumberLeaves());
 			assertEquals(vid2, sub2.getGroupID());
 			assertEquals(instance.getTaskManagerID(), sub2.getTaskManagerID());
-			assertEquals(jobId, sub2.getJobID());
 			assertEquals(sharedSlot, sub2.getParent());
 			assertEquals(sharedSlot, sub2.getRoot());
 			assertEquals(0, sub2.getRootSlotNumber());
@@ -172,7 +172,6 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(1, sub3.getNumberLeaves());
 			assertEquals(vid3, sub3.getGroupID());
 			assertEquals(instance.getTaskManagerID(), sub3.getTaskManagerID());
-			assertEquals(jobId, sub3.getJobID());
 			assertEquals(sharedSlot, sub3.getParent());
 			assertEquals(sharedSlot, sub3.getRoot());
 			assertEquals(0, sub3.getRootSlotNumber());
@@ -192,7 +191,6 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(1, sub4.getNumberLeaves());
 			assertEquals(vid4, sub4.getGroupID());
 			assertEquals(instance.getTaskManagerID(), sub4.getTaskManagerID());
-			assertEquals(jobId, sub4.getJobID());
 			assertEquals(sharedSlot, sub4.getParent());
 			assertEquals(sharedSlot, sub4.getRoot());
 			assertEquals(0, sub4.getRootSlotNumber());
@@ -235,7 +233,6 @@ public class SharedSlotsTest extends TestLogger {
 	@Test
 	public void allocateSimpleSlotsAndReleaseFromLeaves() {
 		try {
-			JobID jobId = new JobID();
 			JobVertexID vid1 = new JobVertexID();
 			JobVertexID vid2 = new JobVertexID();
 			JobVertexID vid3 = new JobVertexID();
@@ -246,7 +243,7 @@ public class SharedSlotsTest extends TestLogger {
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 
 			// allocate a shared slot
-			SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
+			SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
 
 			// allocate a series of sub slots
 
@@ -320,7 +317,6 @@ public class SharedSlotsTest extends TestLogger {
 	@Test
 	public void allocateAndReleaseInMixedOrder() {
 		try {
-			JobID jobId = new JobID();
 			JobVertexID vid1 = new JobVertexID();
 			JobVertexID vid2 = new JobVertexID();
 			JobVertexID vid3 = new JobVertexID();
@@ -331,7 +327,7 @@ public class SharedSlotsTest extends TestLogger {
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 
 			// allocate a shared slot
-			SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
+			SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
 
 			// allocate a series of sub slots
 
@@ -427,7 +423,7 @@ public class SharedSlotsTest extends TestLogger {
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 			
 			// allocate a shared slot
-			SharedSlot sharedSlot = instance.allocateSharedSlot(new JobID(), assignment);
+			SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
 			
 			// get the first simple slot
 			SimpleSlot sourceSlot = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, sourceId);
@@ -563,7 +559,7 @@ public class SharedSlotsTest extends TestLogger {
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 
 			// allocate a shared slot
-			SharedSlot sharedSlot = instance.allocateSharedSlot(new JobID(), assignment);
+			SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
 
 			// get the first simple slot
 			SimpleSlot sourceSlot = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, sourceId);
@@ -607,7 +603,6 @@ public class SharedSlotsTest extends TestLogger {
 	@Test
 	public void testImmediateReleaseOneLevel() {
 		try {
-			JobID jobId = new JobID();
 			JobVertexID vid = new JobVertexID();
 
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
@@ -615,7 +610,7 @@ public class SharedSlotsTest extends TestLogger {
 
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 			
-			SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
+			SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
 
 			SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid);
 			sub.releaseInstanceSlot();
@@ -635,7 +630,6 @@ public class SharedSlotsTest extends TestLogger {
 	@Test
 	public void testImmediateReleaseTwoLevel() {
 		try {
-			JobID jobId = new JobID();
 			JobVertexID vid = new JobVertexID();
 			JobVertex vertex = new JobVertex("vertex", vid);
 			
@@ -647,7 +641,7 @@ public class SharedSlotsTest extends TestLogger {
 			
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 			
-			SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
+			SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
 
 			SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, constraint);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index 42cbbbf..6d572ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -34,7 +33,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class SimpleSlotTest extends TestLogger {
+public class SimpleSlotTest extends  TestLogger {
 
 	@Test
 	public void testStateTransitions() {
@@ -137,6 +136,6 @@ public class SimpleSlotTest extends TestLogger {
 			hardwareDescription,
 			1);
 
-		return instance.allocateSimpleSlot(new JobID());
+		return instance.allocateSimpleSlot();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index 8875e00..5d82f47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -23,11 +23,11 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -36,6 +36,9 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.clock.Clock;
 import org.apache.flink.runtime.util.clock.SystemClock;
@@ -158,7 +161,7 @@ public class SlotPoolRpcTest extends TestLogger {
 
 			assertEquals(1L, (long) pool.getNumberOfWaitingForResourceRequests().get());
 
-			slotPoolGateway.cancelSlotAllocation(requestId).get();
+			slotPoolGateway.cancelSlotRequest(requestId).get();
 
 			assertEquals(0L, (long) pool.getNumberOfWaitingForResourceRequests().get());
 		} finally {
@@ -202,7 +205,7 @@ public class SlotPoolRpcTest extends TestLogger {
 
 			assertEquals(1L, (long) pool.getNumberOfPendingRequests().get());
 
-			slotPoolGateway.cancelSlotAllocation(requestId).get();
+			slotPoolGateway.cancelSlotRequest(requestId).get();
 			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
 		} finally {
 			RpcUtils.terminateRpcEndpoint(pool, timeout);
@@ -252,17 +255,22 @@ public class SlotPoolRpcTest extends TestLogger {
 			}
 
 			AllocationID allocationId = allocationIdFuture.get();
-			ResourceID resourceID = ResourceID.generate();
-			AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationId, jid, DEFAULT_TESTING_PROFILE);
-			slotPoolGateway.registerTaskManager(resourceID).get();
+			final SlotOffer slotOffer = new SlotOffer(
+				allocationId,
+				0,
+				DEFAULT_TESTING_PROFILE);
+			final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+			final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
 
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 
 			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
 
 			assertTrue(pool.containsAllocatedSlot(allocationId).get());
 
-			pool.cancelSlotAllocation(requestId).get();
+			pool.cancelSlotRequest(requestId).get();
 
 			assertFalse(pool.containsAllocatedSlot(allocationId).get());
 			assertTrue(pool.containsAvailableSlot(allocationId).get());
@@ -351,14 +359,14 @@ public class SlotPoolRpcTest extends TestLogger {
 		}
 
 		@Override
-		public CompletableFuture<Acknowledge> cancelSlotAllocation(SlotRequestID slotRequestId) {
+		public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId) {
 			final Consumer<SlotRequestID> currentCancelSlotAllocationConsumer = cancelSlotAllocationConsumer;
 
 			if (currentCancelSlotAllocationConsumer != null) {
 				currentCancelSlotAllocationConsumer.accept(slotRequestId);
 			}
 
-			return super.cancelSlotAllocation(slotRequestId);
+			return super.cancelSlotRequest(slotRequestId);
 		}
 
 		CompletableFuture<Boolean> containsAllocatedSlot(AllocationID allocationId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 450d377..9d90a12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.instance;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -35,6 +35,8 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -74,10 +76,17 @@ public class SlotPoolTest extends TestLogger {
 
 	private JobID jobId;
 
+	private TaskManagerLocation taskManagerLocation;
+
+	private TaskManagerGateway taskManagerGateway;
+
 	@Before
 	public void setUp() throws Exception {
 		this.rpcService = new TestingRpcService();
 		this.jobId = new JobID();
+
+		taskManagerLocation = new LocalTaskManagerLocation();
+		taskManagerGateway = new SimpleAckingTaskManagerGateway();
 	}
 
 	@After
@@ -92,8 +101,7 @@ public class SlotPoolTest extends TestLogger {
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
-			ResourceID resourceID = new ResourceID("resource");
-			slotPoolGateway.registerTaskManager(resourceID);
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
 			SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
@@ -104,14 +112,17 @@ public class SlotPoolTest extends TestLogger {
 
 			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
-			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			final SlotOffer slotOffer = new SlotOffer(
+				slotRequest.getAllocationId(),
+				0,
+				DEFAULT_TESTING_PROFILE);
+
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 
 			LogicalSlot slot = future.get(1, TimeUnit.SECONDS);
 			assertTrue(future.isDone());
 			assertTrue(slot.isAlive());
-			assertEquals(resourceID, slot.getTaskManagerLocation().getResourceID());
-			assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocationId()), slot);
+			assertEquals(taskManagerLocation, slot.getTaskManagerLocation());
 		} finally {
 			slotPool.shutDown();
 		}
@@ -124,8 +135,7 @@ public class SlotPoolTest extends TestLogger {
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
-			ResourceID resourceID = new ResourceID("resource");
-			slotPool.registerTaskManager(resourceID);
+			slotPool.registerTaskManager(taskManagerLocation.getResourceID());
 
 			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
@@ -139,8 +149,12 @@ public class SlotPoolTest extends TestLogger {
 
 			final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues();
 
-			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			final SlotOffer slotOffer = new SlotOffer(
+				slotRequests.get(0).getAllocationId(),
+				0,
+				DEFAULT_TESTING_PROFILE);
+
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 
 			LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
 			assertTrue(future1.isDone());
@@ -158,7 +172,7 @@ public class SlotPoolTest extends TestLogger {
 			assertTrue(slot2.isAlive());
 			assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
 			assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
-			assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocationId()), slot2);
+			assertEquals(slot1.getAllocationId(), slot2.getAllocationId());
 		} finally {
 			slotPool.shutDown();
 		}
@@ -171,8 +185,7 @@ public class SlotPoolTest extends TestLogger {
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
-			ResourceID resourceID = new ResourceID("resource");
-			slotPoolGateway.registerTaskManager(resourceID);
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
 			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 			assertFalse(future1.isDone());
@@ -182,8 +195,12 @@ public class SlotPoolTest extends TestLogger {
 
 			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
-			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			final SlotOffer slotOffer = new SlotOffer(
+				slotRequest.getAllocationId(),
+				0,
+				DEFAULT_TESTING_PROFILE);
+
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 
 			LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
 			assertTrue(future1.isDone());
@@ -214,8 +231,7 @@ public class SlotPoolTest extends TestLogger {
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
-			ResourceID resourceID = new ResourceID("resource");
-			slotPoolGateway.registerTaskManager(resourceID);
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 			assertFalse(future.isDone());
@@ -225,29 +241,36 @@ public class SlotPoolTest extends TestLogger {
 
 			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
+			final SlotOffer slotOffer = new SlotOffer(
+				slotRequest.getAllocationId(),
+				0,
+				DEFAULT_TESTING_PROFILE);
+
+			final TaskManagerLocation invalidTaskManagerLocation = new LocalTaskManagerLocation();
+
 			// slot from unregistered resource
-			AllocatedSlot invalid = createAllocatedSlot(new ResourceID("unregistered"), slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-			assertFalse(slotPoolGateway.offerSlot(invalid).get());
+			assertFalse(slotPoolGateway.offerSlot(invalidTaskManagerLocation, taskManagerGateway, slotOffer).get());
 
-			AllocatedSlot notRequested = createAllocatedSlot(resourceID, new AllocationID(), jobId, DEFAULT_TESTING_PROFILE);
+			final SlotOffer nonRequestedSlotOffer = new SlotOffer(
+				new AllocationID(),
+				0,
+				DEFAULT_TESTING_PROFILE);
 
 			// we'll also accept non requested slots
-			assertTrue(slotPoolGateway.offerSlot(notRequested).get());
-
-			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, nonRequestedSlotOffer).get());
 
 			// accepted slot
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 			LogicalSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(slot.isAlive());
 
 			// duplicated offer with using slot
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 			assertTrue(slot.isAlive());
 
 			// duplicated offer with free slot
 			slot.releaseSlot();
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 		} finally {
 			slotPool.shutDown();
 		}
@@ -261,8 +284,8 @@ public class SlotPoolTest extends TestLogger {
 
 		final SlotPool slotPool = new SlotPool(rpcService, jobId) {
 			@Override
-			public void returnAllocatedSlot(Slot slot) {
-				super.returnAllocatedSlot(slot);
+			public void returnAllocatedSlot(SlotContext allocatedSlot) {
+				super.returnAllocatedSlot(allocatedSlot);
 
 				slotReturnFuture.complete(true);
 			}
@@ -270,8 +293,7 @@ public class SlotPoolTest extends TestLogger {
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
-			ResourceID resourceID = new ResourceID("resource");
-			slotPoolGateway.registerTaskManager(resourceID);
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
 			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
@@ -282,14 +304,18 @@ public class SlotPoolTest extends TestLogger {
 
 			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
-			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			final SlotOffer slotOffer = new SlotOffer(
+				slotRequest.getAllocationId(),
+				0,
+				DEFAULT_TESTING_PROFILE);
+
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 
 			LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
 			assertTrue(future1.isDone());
 			assertFalse(future2.isDone());
 
-			slotPoolGateway.releaseTaskManager(resourceID);
+			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID());
 
 			// wait until the slot has been returned
 			slotReturnFuture.get();
@@ -378,24 +404,4 @@ public class SlotPoolTest extends TestLogger {
 
 		return slotPool.getSelfGateway(SlotPoolGateway.class);
 	}
-
-	static AllocatedSlot createAllocatedSlot(
-			final ResourceID resourceId,
-			final AllocationID allocationId,
-			final JobID jobId,
-			final ResourceProfile resourceProfile) {
-		TaskManagerLocation mockTaskManagerLocation = mock(TaskManagerLocation.class);
-		when(mockTaskManagerLocation.getResourceID()).thenReturn(resourceId);
-
-		TaskManagerGateway mockTaskManagerGateway = mock(TaskManagerGateway.class);
-
-		return new AllocatedSlot(
-			allocationId,
-			jobId,
-			mockTaskManagerLocation,
-			0,
-			resourceProfile,
-			mockTaskManagerGateway);
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
index dca47d3..28cab72 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
@@ -49,14 +48,12 @@ public class SlotSharingGroupAssignmentTest extends TestLogger {
 		final int numberSlots = 2;
 		final JobVertexID sourceId = new JobVertexID();
 		final JobVertexID sinkId = new JobVertexID();
-		final JobID jobId = new JobID();
 
 		for (int i = 0; i < numberTaskManagers; i++) {
 			final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(ResourceID.generate(), InetAddress.getLocalHost(), i + 1000);
 
 			for (int j = 0; j < numberSlots; j++) {
 				final SharedSlot slot = new SharedSlot(
-					jobId,
 					mock(SlotOwner.class),
 					taskManagerLocation,
 					j,

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
index 3f267ac..d40ff61 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
@@ -90,10 +90,10 @@ public class CoLocationConstraintTest {
 			Instance instance1 = SchedulerTestUtils.getRandomInstance(2);
 			Instance instance2 = SchedulerTestUtils.getRandomInstance(2);
 			
-			SharedSlot slot1_1 = instance1.allocateSharedSlot(jid, assignment);
-			SharedSlot slot1_2 = instance1.allocateSharedSlot(jid, assignment);
-			SharedSlot slot2_1 = instance2.allocateSharedSlot(jid, assignment);
-			SharedSlot slot2_2 = instance2.allocateSharedSlot(jid, assignment);
+			SharedSlot slot1_1 = instance1.allocateSharedSlot(assignment);
+			SharedSlot slot1_2 = instance1.allocateSharedSlot(assignment);
+			SharedSlot slot2_1 = instance2.allocateSharedSlot(assignment);
+			SharedSlot slot2_2 = instance2.allocateSharedSlot(assignment);
 			
 			// constraint is still completely unassigned
 			assertFalse(constraint.isAssigned());

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
new file mode 100644
index 0000000..6894542
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.instance.Slot;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * SlotOwner implementation used for testing purposes only.
+ */
+public class DummySlotOwner implements SlotOwner {
+	@Override
+	public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
+		return CompletableFuture.completedFuture(false);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 9c12fff..01f445b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -58,12 +58,14 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+
 import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.net.InetAddress;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
@@ -169,7 +171,7 @@ public class TaskExecutorITCase extends TestLogger {
 		when(jmGateway.getHostname()).thenReturn(jmAddress);
 		when(jmGateway.offerSlots(
 			eq(taskManagerResourceId),
-			any(Iterable.class),
+			any(Collection.class),
 			any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
 		when(jmGateway.getFencingToken()).thenReturn(jobMasterId);
 
@@ -214,7 +216,7 @@ public class TaskExecutorITCase extends TestLogger {
 
 			verify(jmGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(
 				eq(taskManagerResourceId),
-				(Iterable<SlotOffer>)argThat(Matchers.contains(slotOffer)),
+				(Collection<SlotOffer>)argThat(Matchers.contains(slotOffer)),
 				any(Time.class));
 		} finally {
 			if (testingFatalErrorHandler.hasExceptionOccurred()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 6372792..29d07fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -852,7 +852,7 @@ public class TaskExecutorTest extends TestLogger {
 		when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 		when(jobMasterGateway.offerSlots(
 			any(ResourceID.class),
-			any(Iterable.class),
+			any(Collection.class),
 			any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
 
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
@@ -904,7 +904,7 @@ public class TaskExecutorTest extends TestLogger {
 			// the job leader should get the allocation id offered
 			verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(
 					any(ResourceID.class),
-					(Iterable<SlotOffer>)Matchers.argThat(contains(slotOffer)),
+					(Collection<SlotOffer>)Matchers.argThat(contains(slotOffer)),
 					any(Time.class));
 
 			// check if a concurrent error occurred
@@ -975,7 +975,7 @@ public class TaskExecutorTest extends TestLogger {
 		when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 
 		when(jobMasterGateway.offerSlots(
-				any(ResourceID.class), any(Iterable.class), any(Time.class)))
+				any(ResourceID.class), any(Collection.class), any(Time.class)))
 			.thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>)Collections.singleton(offer1)));
 
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
@@ -1315,7 +1315,7 @@ public class TaskExecutorTest extends TestLogger {
 			when(
 				jobMasterGateway.offerSlots(
 					any(ResourceID.class),
-					any(Iterable.class),
+					any(Collection.class),
 					any(Time.class)))
 				.thenReturn(offerResultFuture);
 
@@ -1323,7 +1323,7 @@ public class TaskExecutorTest extends TestLogger {
 			// been properly started. This will also offer the slots to the job master
 			jobLeaderService.addJob(jobId, jobManagerAddress);
 
-			verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), any(Iterable.class), any(Time.class));
+			verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), any(Collection.class), any(Time.class));
 
 			// submit the task without having acknowledge the offered slots
 			tmGateway.submitTask(tdd, jobMasterId, timeout);