You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/02 20:01:02 UTC

[flink] branch master updated (6b1be17 -> f75d8e1)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 6b1be17  [hotfix] Let FlinkKinesisConsumerTest extend TestLogger
     new c1ffb72  [hotfix] Introduce TestingLogicalSlotBuilder
     new f75d8e1  [FLINK-13334][coordination] Remove legacy slot implementation

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/executiongraph/ExecutionJobVertex.java |   4 -
 .../apache/flink/runtime/instance/SharedSlot.java  | 333 -----------
 .../apache/flink/runtime/instance/SimpleSlot.java  | 290 ---------
 .../org/apache/flink/runtime/instance/Slot.java    | 381 ------------
 .../instance/SlotSharingGroupAssignment.java       | 664 ---------------------
 .../jobmanager/scheduler/CoLocationConstraint.java |  91 +--
 .../jobmanager/scheduler/CoLocationGroup.java      |   6 -
 .../jobmanager/scheduler/SlotSharingGroup.java     |  23 -
 .../ExecutionGraphDeploymentTest.java              |  34 +-
 .../executiongraph/ExecutionGraphMetricsTest.java  |   6 +-
 .../ExecutionGraphPartitionReleaseTest.java        |   4 +-
 .../ExecutionGraphSchedulingTest.java              |  80 +--
 .../executiongraph/ExecutionGraphTestUtils.java    |   4 +-
 .../ExecutionPartitionLifecycleTest.java           |  13 +-
 .../runtime/executiongraph/ExecutionTest.java      |  36 +-
 .../executiongraph/ExecutionVertexCancelTest.java  |  16 +-
 .../ExecutionVertexDeploymentTest.java             |  13 +-
 .../ExecutionVertexLocalityTest.java               |  18 +-
 .../ExecutionVertexSchedulingTest.java             |   8 +-
 .../executiongraph/utils/SimpleSlotProvider.java   |  18 +-
 .../flink/runtime/instance/SharedSlotsTest.java    | 630 -------------------
 .../flink/runtime/instance/SimpleSlotTest.java     | 135 -----
 .../scheduler/ScheduleWithCoLocationHintTest.java  |  10 -
 .../runtime/jobmaster/TestingLogicalSlot.java      |  69 +--
 .../jobmaster/TestingLogicalSlotBuilder.java       |  93 +++
 .../DefaultExecutionSlotAllocatorTest.java         |   4 +-
 ...GraphToInputsLocationsRetrieverAdapterTest.java |   3 +-
 27 files changed, 233 insertions(+), 2753 deletions(-)
 delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
 delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
 delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
 delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java


[flink] 01/02: [hotfix] Introduce TestingLogicalSlotBuilder

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c1ffb7204a60a9b2aad9b73881a0c4d82e5f9324
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Aug 2 19:01:18 2019 +0200

    [hotfix] Introduce TestingLogicalSlotBuilder
---
 .../ExecutionGraphDeploymentTest.java              | 10 +--
 .../executiongraph/ExecutionGraphMetricsTest.java  |  6 +-
 .../ExecutionGraphPartitionReleaseTest.java        |  4 +-
 .../ExecutionGraphSchedulingTest.java              | 24 ++----
 .../executiongraph/ExecutionGraphTestUtils.java    |  4 +-
 .../runtime/executiongraph/ExecutionTest.java      |  4 +-
 .../executiongraph/ExecutionVertexCancelTest.java  | 16 ++--
 .../ExecutionVertexDeploymentTest.java             | 13 +--
 .../ExecutionVertexSchedulingTest.java             |  8 +-
 .../executiongraph/utils/SimpleSlotProvider.java   | 18 ++---
 .../runtime/jobmaster/TestingLogicalSlot.java      | 69 ++++------------
 .../jobmaster/TestingLogicalSlotBuilder.java       | 93 ++++++++++++++++++++++
 .../DefaultExecutionSlotAllocatorTest.java         |  4 +-
 ...GraphToInputsLocationsRetrieverAdapterTest.java |  3 +-
 14 files changed, 161 insertions(+), 115 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index d10fcbd..13f74f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -58,7 +58,7 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.operators.BatchTask;
@@ -199,7 +199,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 				tdd.complete(taskDeploymentDescriptor);
 			}));
 
-			final LogicalSlot slot = new TestingLogicalSlot(taskManagerGateway);
+			final LogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(taskManagerGateway).createTestingLogicalSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
@@ -443,7 +443,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 
 		final ArrayDeque<CompletableFuture<LogicalSlot>> slotFutures = new ArrayDeque<>();
 		for (int i = 0; i < dop1; i++) {
-			slotFutures.addLast(CompletableFuture.completedFuture(new TestingLogicalSlot()));
+			slotFutures.addLast(CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()));
 		}
 
 		final SlotProvider slotProvider = new TestingSlotProvider(ignore -> slotFutures.removeFirst());
@@ -507,7 +507,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 
 		final ArrayDeque<CompletableFuture<LogicalSlot>> slotFutures = new ArrayDeque<>();
 		for (int i = 0; i < dop1 + dop2; i++) {
-			slotFutures.addLast(CompletableFuture.completedFuture(new TestingLogicalSlot()));
+			slotFutures.addLast(CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()));
 		}
 
 		final SlotProvider slotProvider = new TestingSlotProvider(ignore -> slotFutures.removeFirst());
@@ -706,7 +706,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 		Collections.shuffle(shuffledFutures);
 
 		for (CompletableFuture<LogicalSlot> slotFuture : shuffledFutures) {
-			slotFuture.complete(new TestingLogicalSlot(taskManagerGateway));
+			slotFuture.complete(new TestingLogicalSlotBuilder().setTaskManagerGateway(taskManagerGateway).createTestingLogicalSlot());
 		}
 
 		final List<ExecutionAttemptID> submittedTasks = new ArrayList<>(numberTasks);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 72d3416..7389296 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
@@ -71,8 +71,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 			Configuration jobConfig = new Configuration();
 			Time timeout = Time.seconds(10L);
 
-			CompletableFuture<LogicalSlot> slotFuture1 = CompletableFuture.completedFuture(new TestingLogicalSlot());
-			CompletableFuture<LogicalSlot> slotFuture2 = CompletableFuture.completedFuture(new TestingLogicalSlot());
+			CompletableFuture<LogicalSlot> slotFuture1 = CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot());
+			CompletableFuture<LogicalSlot> slotFuture2 = CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot());
 			ArrayDeque<CompletableFuture<LogicalSlot>> slotFutures = new ArrayDeque<>();
 			slotFutures.addLast(slotFuture1);
 			slotFutures.addLast(slotFuture2);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
index 7d2369f..5c48b56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.TestLogger;
@@ -183,7 +183,7 @@ public class ExecutionGraphPartitionReleaseTest extends TestLogger {
 			new Configuration(),
 			scheduledExecutorService,
 			mainThreadExecutor.getMainThreadExecutor(),
-			new TestingSlotProvider(ignored -> CompletableFuture.completedFuture(new TestingLogicalSlot())),
+			new TestingSlotProvider(ignored -> CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot())),
 			ExecutionGraphPartitionReleaseTest.class.getClassLoader(),
 			new StandaloneCheckpointRecoveryFactory(),
 			AkkaUtils.getDefaultTimeout(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 4c59b52..3850a70 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -48,10 +47,10 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
@@ -62,7 +61,6 @@ import org.junit.After;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.net.InetAddress;
 import java.util.Set;
@@ -444,16 +442,15 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 		executionGraph.scheduleForExecution();
 
-		final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
-
-		final TestingLogicalSlot slot = createTestingSlot(releaseFuture);
+		final TestingLogicalSlot slot = createTestingSlot();
+		final CompletableFuture<?> releaseFuture = slot.getReleaseFuture();
 		slotFuture1.complete(slot);
 
 		// cancel should change the state of all executions to CANCELLED
 		executionGraph.cancel();
 
 		// complete the now CANCELLED execution --> this should cause a failure
-		slotFuture2.complete(new TestingLogicalSlot());
+		slotFuture2.complete(new TestingLogicalSlotBuilder().createTestingLogicalSlot());
 
 		Thread.sleep(1L);
 		// release the first slot to finish the cancellation
@@ -650,14 +647,9 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 	}
 
 	@Nonnull
-	private static TestingLogicalSlot createTestingSlot(@Nullable CompletableFuture<?> releaseFuture) {
-		return new TestingLogicalSlot(
-			new LocalTaskManagerLocation(),
-			new SimpleAckingTaskManagerGateway(),
-			0,
-			new AllocationID(),
-			new SlotRequestId(),
-			new SlotSharingGroupId(),
-			releaseFuture);
+	private static TestingLogicalSlot createTestingSlot() {
+		return new TestingLogicalSlotBuilder()
+			.setAutomaticallyCompleteReleaseFuture(false)
+			.createTestingLogicalSlot();
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 6c2e02c..25d169c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -50,7 +50,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
@@ -555,7 +555,7 @@ public class ExecutionGraphTestUtils {
 		private Time rpcTimeout = AkkaUtils.getDefaultTimeout();
 		private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
 		private ClassLoader classLoader = getClass().getClassLoader();
-		private SlotProvider slotProvider = new TestingSlotProvider(slotRequestId -> CompletableFuture.completedFuture(new TestingLogicalSlot()));
+		private SlotProvider slotProvider = new TestingSlotProvider(slotRequestId -> CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()));
 		private Executor ioExecutor = TestingUtils.defaultExecutor();
 		private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor();
 		private Configuration jobMasterConfig = new Configuration();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 433e3f6..6df8366 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
@@ -112,7 +112,7 @@ public class ExecutionTest extends TestLogger {
 			0,
 			new SimpleAckingTaskManagerGateway());
 
-		final LogicalSlot otherSlot = new TestingLogicalSlot();
+		final LogicalSlot otherSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
 		CompletableFuture<Execution> allocationFuture = execution.allocateResourcesForExecution(
 			executionGraph.getSlotProviderStrategy(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index c99684e..9172348 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.TestLogger;
@@ -120,7 +120,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
 
-			LogicalSlot slot = new TestingLogicalSlot(new CancelSequenceSimpleAckingTaskManagerGateway(1));
+			LogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot();
 
 			setVertexResource(vertex, slot);
 			setVertexState(vertex, ExecutionState.RUNNING);
@@ -155,7 +155,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
 
-			LogicalSlot slot = new TestingLogicalSlot(new CancelSequenceSimpleAckingTaskManagerGateway(1));
+			LogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot();
 
 			setVertexResource(vertex, slot);
 			setVertexState(vertex, ExecutionState.RUNNING);
@@ -198,7 +198,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
 
-			LogicalSlot slot = new TestingLogicalSlot(new CancelSequenceSimpleAckingTaskManagerGateway(1));
+			LogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot();
 
 			setVertexResource(vertex, slot);
 			setVertexState(vertex, ExecutionState.RUNNING);
@@ -228,7 +228,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
 
-			LogicalSlot slot = new TestingLogicalSlot(new CancelSequenceSimpleAckingTaskManagerGateway(0));
+			LogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(0)).createTestingLogicalSlot();
 
 			setVertexResource(vertex, slot);
 			setVertexState(vertex, ExecutionState.RUNNING);
@@ -305,7 +305,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			// the scheduler (or any caller) needs to know that the slot should be released
 			try {
 
-				final LogicalSlot slot = new TestingLogicalSlot();
+				final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
 				vertex.deployToSlot(slot);
 				fail("Method should throw an exception");
@@ -347,7 +347,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 						AkkaUtils.getDefaultTimeout());
 				setVertexState(vertex, ExecutionState.CANCELING);
 
-				final LogicalSlot slot = new TestingLogicalSlot();
+				final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
 				vertex.deployToSlot(slot);
 				fail("Method should throw an exception");
@@ -361,7 +361,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 				ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 						AkkaUtils.getDefaultTimeout());
 
-				final LogicalSlot slot = new TestingLogicalSlot();
+				final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
 				setVertexResource(vertex, slot);
 				setVertexState(vertex, ExecutionState.CANCELING);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 34eabe8..6093000 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
@@ -64,7 +65,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 
 			final ExecutionJobVertex ejv = getExecutionVertex(jid);
 
-			final LogicalSlot slot = new TestingLogicalSlot();
+			final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 				AkkaUtils.getDefaultTimeout());
@@ -98,7 +99,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 
 			final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService());
 
-			final LogicalSlot slot = new TestingLogicalSlot();
+			final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 				AkkaUtils.getDefaultTimeout());
@@ -137,7 +138,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 				AkkaUtils.getDefaultTimeout());
 
-			final LogicalSlot slot = new TestingLogicalSlot();
+			final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
@@ -179,7 +180,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 				AkkaUtils.getDefaultTimeout());
 
-			final LogicalSlot slot = new TestingLogicalSlot(new SubmitFailingSimpleAckingTaskManagerGateway());
+			final LogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SubmitFailingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
@@ -206,7 +207,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 				AkkaUtils.getDefaultTimeout());
 
-			final LogicalSlot slot = new TestingLogicalSlot(new SubmitFailingSimpleAckingTaskManagerGateway());
+			final LogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SubmitFailingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
@@ -244,7 +245,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 				AkkaUtils.getDefaultTimeout());
 
-			TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlot(new SubmitBlockingSimpleAckingTaskManagerGateway());
+			TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SubmitBlockingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			vertex.deployToSlot(testingLogicalSlot);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index f7a2600..cf55f70 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -46,7 +46,7 @@ public class ExecutionVertexSchedulingTest extends TestLogger {
 					AkkaUtils.getDefaultTimeout());
 
 			// a slot than cannot be deployed to
-			final LogicalSlot slot = new TestingLogicalSlot();
+			final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
 			slot.releaseSlot(new Exception("Test Exception"));
 
 			assertFalse(slot.isAlive());
@@ -78,7 +78,7 @@ public class ExecutionVertexSchedulingTest extends TestLogger {
 					AkkaUtils.getDefaultTimeout());
 
 			// a slot than cannot be deployed to
-			final LogicalSlot slot = new TestingLogicalSlot();
+			final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
 			slot.releaseSlot(new Exception("Test Exception"));
 
 			assertFalse(slot.isAlive());
@@ -113,7 +113,7 @@ public class ExecutionVertexSchedulingTest extends TestLogger {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
 
-			final LogicalSlot slot = new TestingLogicalSlot();
+			final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
 			CompletableFuture<LogicalSlot> future = CompletableFuture.completedFuture(slot);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index 76f7daa..15ec343 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -99,15 +100,14 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 				slot = slots.removeFirst();
 			}
 			if (slot != null) {
-				TestingLogicalSlot result = new TestingLogicalSlot(
-					slot.getTaskManagerLocation(),
-					slot.getTaskManagerGateway(),
-					slot.getPhysicalSlotNumber(),
-					slot.getAllocationId(),
-					slotRequestId,
-					new SlotSharingGroupId(),
-					null,
-					this);
+				TestingLogicalSlot result = new TestingLogicalSlotBuilder()
+					.setTaskManagerLocation(slot.getTaskManagerLocation())
+					.setTaskManagerGateway(slot.getTaskManagerGateway())
+					.setSlotNumber(slot.getPhysicalSlotNumber())
+					.setAllocationId(slot.getAllocationId())
+					.setSlotRequestId(slotRequestId)
+					.setSlotOwner(this)
+					.createTestingLogicalSlot();
 				allocatedSlots.put(slotRequestId, slot);
 				return CompletableFuture.completedFuture(result);
 			}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
index 5060478..3304c96 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
@@ -19,11 +19,9 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
@@ -47,10 +45,8 @@ public class TestingLogicalSlot implements LogicalSlot {
 
 	private final CompletableFuture<?> releaseFuture;
 
-	@Nullable
-	private final CompletableFuture<?> customReleaseFuture;
+	private final boolean automaticallyCompleteReleaseFuture;
 
-	@Nullable
 	private final SlotOwner slotOwner;
 
 	private final AllocationID allocationId;
@@ -59,49 +55,15 @@ public class TestingLogicalSlot implements LogicalSlot {
 
 	private final SlotSharingGroupId slotSharingGroupId;
 
-	public TestingLogicalSlot() {
-		this(new SimpleAckingTaskManagerGateway());
-	}
-
-	public TestingLogicalSlot(TaskManagerGateway taskManagerGateway) {
-		this(
-			new LocalTaskManagerLocation(),
-			taskManagerGateway,
-			0,
-			new AllocationID(),
-			new SlotRequestId(),
-			new SlotSharingGroupId(),
-			null);
-	}
-
-	public TestingLogicalSlot(
+	TestingLogicalSlot(
 			TaskManagerLocation taskManagerLocation,
 			TaskManagerGateway taskManagerGateway,
 			int slotNumber,
 			AllocationID allocationId,
 			SlotRequestId slotRequestId,
 			SlotSharingGroupId slotSharingGroupId,
-			@Nullable CompletableFuture<?> customReleaseFuture) {
-		this(
-			taskManagerLocation,
-			taskManagerGateway,
-			slotNumber,
-			allocationId,
-			slotRequestId,
-			slotSharingGroupId,
-			customReleaseFuture,
-			null);
-	}
-
-	public TestingLogicalSlot(
-			TaskManagerLocation taskManagerLocation,
-			TaskManagerGateway taskManagerGateway,
-			int slotNumber,
-			AllocationID allocationId,
-			SlotRequestId slotRequestId,
-			SlotSharingGroupId slotSharingGroupId,
-			@Nullable CompletableFuture<?> customReleaseFuture,
-			@Nullable SlotOwner slotOwner) {
+			boolean automaticallyCompleteReleaseFuture,
+			SlotOwner slotOwner) {
 
 		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
@@ -111,7 +73,7 @@ public class TestingLogicalSlot implements LogicalSlot {
 		this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
 		this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId);
 		this.releaseFuture = new CompletableFuture<>();
-		this.customReleaseFuture = customReleaseFuture;
+		this.automaticallyCompleteReleaseFuture = automaticallyCompleteReleaseFuture;
 		this.slotOwner = slotOwner;
 	}
 
@@ -132,11 +94,7 @@ public class TestingLogicalSlot implements LogicalSlot {
 
 	@Override
 	public boolean isAlive() {
-		if (customReleaseFuture != null) {
-			return !customReleaseFuture.isDone();
-		} else {
-			return !releaseFuture.isDone();
-		}
+		return !releaseFuture.isDone();
 	}
 
 	@Override
@@ -152,16 +110,13 @@ public class TestingLogicalSlot implements LogicalSlot {
 
 	@Override
 	public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
-		if (slotOwner != null) {
-			slotOwner.returnLogicalSlot(this);
-		}
+		slotOwner.returnLogicalSlot(this);
 
-		if (customReleaseFuture != null) {
-			return customReleaseFuture;
-		} else {
+		if (automaticallyCompleteReleaseFuture) {
 			releaseFuture.complete(null);
-			return releaseFuture;
 		}
+
+		return releaseFuture;
 	}
 
 	@Override
@@ -184,4 +139,8 @@ public class TestingLogicalSlot implements LogicalSlot {
 	public SlotSharingGroupId getSlotSharingGroupId() {
 		return slotSharingGroupId;
 	}
+
+	public CompletableFuture<?> getReleaseFuture() {
+		return releaseFuture;
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java
new file mode 100644
index 0000000..f703c91
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Builder for the {@link TestingLogicalSlot}.
+ */
+public class TestingLogicalSlotBuilder {
+	private TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+	private TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+	private int slotNumber = 0;
+	private AllocationID allocationId = new AllocationID();
+	private SlotRequestId slotRequestId = new SlotRequestId();
+	private SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+	private SlotOwner slotOwner = new DummySlotOwner();
+	private boolean automaticallyCompleteReleaseFuture = true;
+
+	public TestingLogicalSlotBuilder setTaskManagerGateway(TaskManagerGateway taskManagerGateway) {
+		this.taskManagerGateway = taskManagerGateway;
+		return this;
+	}
+
+	public TestingLogicalSlotBuilder setTaskManagerLocation(TaskManagerLocation taskManagerLocation) {
+		this.taskManagerLocation = taskManagerLocation;
+		return this;
+	}
+
+	public TestingLogicalSlotBuilder setSlotNumber(int slotNumber) {
+		this.slotNumber = slotNumber;
+		return this;
+	}
+
+	public TestingLogicalSlotBuilder setAllocationId(AllocationID allocationId) {
+		this.allocationId = allocationId;
+		return this;
+	}
+
+	public TestingLogicalSlotBuilder setSlotRequestId(SlotRequestId slotRequestId) {
+		this.slotRequestId = slotRequestId;
+		return this;
+	}
+
+	public TestingLogicalSlotBuilder setSlotSharingGroupId(SlotSharingGroupId slotSharingGroupId) {
+		this.slotSharingGroupId = slotSharingGroupId;
+		return this;
+	}
+
+	public TestingLogicalSlotBuilder setAutomaticallyCompleteReleaseFuture(boolean automaticallyCompleteReleaseFuture) {
+		this.automaticallyCompleteReleaseFuture = automaticallyCompleteReleaseFuture;
+		return this;
+	}
+
+	public TestingLogicalSlotBuilder setSlotOwner(SlotOwner slotOwner) {
+		this.slotOwner = slotOwner;
+		return this;
+	}
+
+	public TestingLogicalSlot createTestingLogicalSlot() {
+		return new TestingLogicalSlot(
+			taskManagerLocation,
+			taskManagerGateway,
+			slotNumber,
+			allocationId,
+			slotRequestId,
+			slotSharingGroupId,
+			automaticallyCompleteReleaseFuture,
+			slotOwner);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
index 4c0dd01..4088f33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
@@ -303,7 +303,7 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger {
 			if (slotAllocationDisabled) {
 				return new CompletableFuture<>();
 			} else {
-				return CompletableFuture.completedFuture(new TestingLogicalSlot());
+				return CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot());
 			}
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapterTest.java
index a92ff2c..d9eae6b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapterTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
@@ -110,7 +111,7 @@ public class ExecutionGraphToInputsLocationsRetrieverAdapterTest extends TestLog
 	public void testGetTaskManagerLocationWhenScheduled() throws Exception {
 		final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
 
-		final TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlot();
+		final TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
 		final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), jobVertex);
 		final ExecutionGraphToInputsLocationsRetrieverAdapter inputsLocationsRetriever =
 				new ExecutionGraphToInputsLocationsRetrieverAdapter(eg);


[flink] 02/02: [FLINK-13334][coordination] Remove legacy slot implementation

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f75d8e1fbb16ba08ab5a625f50e988c708a8a2bf
Author: tison <wa...@gmail.com>
AuthorDate: Fri Jul 26 23:44:43 2019 +0800

    [FLINK-13334][coordination] Remove legacy slot implementation
    
    This closes #9245.
---
 .../runtime/executiongraph/ExecutionJobVertex.java |   4 -
 .../apache/flink/runtime/instance/SharedSlot.java  | 333 -----------
 .../apache/flink/runtime/instance/SimpleSlot.java  | 290 ---------
 .../org/apache/flink/runtime/instance/Slot.java    | 381 ------------
 .../instance/SlotSharingGroupAssignment.java       | 664 ---------------------
 .../jobmanager/scheduler/CoLocationConstraint.java |  91 +--
 .../jobmanager/scheduler/CoLocationGroup.java      |   6 -
 .../jobmanager/scheduler/SlotSharingGroup.java     |  23 -
 .../ExecutionGraphDeploymentTest.java              |  24 +-
 .../ExecutionGraphSchedulingTest.java              |  56 +-
 .../ExecutionPartitionLifecycleTest.java           |  13 +-
 .../runtime/executiongraph/ExecutionTest.java      |  34 +-
 .../ExecutionVertexLocalityTest.java               |  18 +-
 .../flink/runtime/instance/SharedSlotsTest.java    | 630 -------------------
 .../flink/runtime/instance/SimpleSlotTest.java     | 135 -----
 .../scheduler/ScheduleWithCoLocationHintTest.java  |  10 -
 16 files changed, 73 insertions(+), 2639 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index e883792..1a0d389 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -515,10 +515,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 
 		synchronized (stateMonitor) {
 			// check and reset the sharing groups with scheduler hints
-			if (slotSharingGroup != null) {
-				slotSharingGroup.clearTaskAssignment();
-			}
-
 			for (int i = 0; i < parallelism; i++) {
 				taskVertices[i].resetForNewExecution(timestamp, expectedGlobalModVersion);
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
deleted file mode 100644
index 0f9c104..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotContext;
-import org.apache.flink.runtime.jobmaster.SlotOwner;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.AbstractID;
-
-import javax.annotation.Nullable;
-
-import java.util.ConcurrentModificationException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * This class represents a shared slot. A shared slot can have multiple
- * {@link SimpleSlot} instances within itself. This allows to
- * schedule multiple tasks simultaneously to the same resource. Sharing a resource with multiple
- * tasks is crucial for simple pipelined / streamed execution, where both the sender and the receiver
- * are typically active at the same time.
- *
- * <p><b>IMPORTANT:</b> This class contains no synchronization. Thus, the caller has to guarantee proper
- * synchronization. In the current implementation, all concurrently modifying operations are
- * passed through a {@link SlotSharingGroupAssignment} object which is responsible for
- * synchronization.
- */
-public class SharedSlot extends Slot implements LogicalSlot {
-
-	/** The assignment group os shared slots that manages the availability and release of the slots */
-	private final SlotSharingGroupAssignment assignmentGroup;
-
-	/** The set os sub-slots allocated from this shared slot */
-	private final Set<Slot> subSlots;
-
-	// ------------------------------------------------------------------------
-	//  Old Constructors (legacy code)
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new shared slot that has no parent (is a root slot) and does not belong to any task group.
-	 * This constructor is used to create a slot directly from an instance. 
-	 *
-	 * @param owner The component from which this slot is allocated.
-	 * @param location The location info of the TaskManager where the slot was allocated from
-	 * @param slotNumber The number of the slot.
-	 * @param taskManagerGateway The gateway to communicate with the TaskManager
-	 * @param assignmentGroup The assignment group that this shared slot belongs to.
-	 */
-	public SharedSlot(
-		SlotOwner owner, TaskManagerLocation location, int slotNumber,
-		TaskManagerGateway taskManagerGateway,
-		SlotSharingGroupAssignment assignmentGroup) {
-
-		this(owner, location, slotNumber, taskManagerGateway, assignmentGroup, null, null);
-	}
-
-	/**
-	 * Creates a new shared slot that has is a sub-slot of the given parent shared slot, and that belongs
-	 * to the given task group.
-	 *
-	 * @param owner The component from which this slot is allocated.
-	 * @param location The location info of the TaskManager where the slot was allocated from
-	 * @param slotNumber The number of the slot.
-	 * @param taskManagerGateway The gateway to communicate with the TaskManager
-	 * @param assignmentGroup The assignment group that this shared slot belongs to.
-	 * @param parent The parent slot of this slot.
-	 * @param groupId The assignment group of this slot.
-	 */
-	public SharedSlot(
-			SlotOwner owner,
-			TaskManagerLocation location,
-			int slotNumber,
-			TaskManagerGateway taskManagerGateway,
-			SlotSharingGroupAssignment assignmentGroup,
-			@Nullable SharedSlot parent,
-			@Nullable AbstractID groupId) {
-
-		super(owner, location, slotNumber, taskManagerGateway, parent, groupId);
-
-		this.assignmentGroup = checkNotNull(assignmentGroup);
-		this.subSlots = new HashSet<Slot>();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Constructors
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new shared slot that has no parent (is a root slot) and does not belong to any task group.
-	 * This constructor is used to create a slot directly from an instance.
-	 * 
-	 * @param slotContext The slot context of this shared slot
-	 * @param owner The component from which this slot is allocated.
-	 * @param assignmentGroup The assignment group that this shared slot belongs to.
-	 */
-	public SharedSlot(SlotContext slotContext, SlotOwner owner, SlotSharingGroupAssignment assignmentGroup) {
-		this(slotContext, owner, slotContext.getPhysicalSlotNumber(), assignmentGroup, null, null);
-	}
-
-	private SharedSlot(
-			SlotContext slotInformation,
-			SlotOwner owner,
-			int slotNumber,
-			SlotSharingGroupAssignment assignmentGroup,
-			@Nullable SharedSlot parent,
-			@Nullable AbstractID groupId) {
-
-		super(slotInformation, owner, slotNumber, parent, groupId);
-
-		this.assignmentGroup = checkNotNull(assignmentGroup);
-		this.subSlots = new HashSet<Slot>();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Properties
-	// ------------------------------------------------------------------------
-
-	@Override
-	public int getNumberLeaves() {
-		while (true) {
-			try {
-				int result = 0;
-				for (Slot slot: subSlots){
-					result += slot.getNumberLeaves();
-				}
-				return result;
-			}
-			catch (ConcurrentModificationException e) {
-				// ignore and retry
-			}
-		}
-	}
-
-	/**
-	 * Checks whether this slot is a root slot that has not yet added any child slots.
-	 * 
-	 * @return True, if this slot is a root slot and has not yet added any children, false otherwise.
-	 */
-	public boolean isRootAndEmpty() {
-		return getParent() == null && subSlots.isEmpty();
-	}
-
-	/**
-	 * Checks whether this shared slot has any sub slots.
-	 * 
-	 * @return True, if the shared slot has sub slots, false otherwise.
-	 */
-	public boolean hasChildren() {
-		return subSlots.size() > 0;
-	}
-
-	@Override
-	public Locality getLocality() {
-		return Locality.UNKNOWN;
-	}
-
-	@Override
-	public boolean tryAssignPayload(Payload payload) {
-		throw new UnsupportedOperationException("Cannot assign an execution attempt id to a shared slot.");
-	}
-
-	@Nullable
-	@Override
-	public Payload getPayload() {
-		return null;
-	}
-
-	@Override
-	public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
-		assignmentGroup.releaseSharedSlot(this);
-
-		if (!(isReleased() && subSlots.isEmpty())) {
-			throw new IllegalStateException("Bug: SharedSlot is not released or not empty after call to releaseSlot()");
-		}
-
-		return CompletableFuture.completedFuture(null);
-	}
-
-	@Override
-	public int getPhysicalSlotNumber() {
-		return getRootSlotNumber();
-	}
-
-	@Override
-	public AllocationID getAllocationId() {
-		return getSlotContext().getAllocationId();
-	}
-
-	@Override
-	public SlotRequestId getSlotRequestId() {
-		return NO_SLOT_REQUEST_ID;
-	}
-
-	@Nullable
-	@Override
-	public SlotSharingGroupId getSlotSharingGroupId() {
-		return NO_SLOT_SHARING_GROUP_ID;
-	}
-
-	/**
-	 * Gets the set of all slots allocated as sub-slots of this shared slot.
-	 *
-	 * @return All sub-slots allocated from this shared slot.
-	 */
-	Set<Slot> getSubSlots() {
-		return subSlots;
-	}
-
-	// ------------------------------------------------------------------------
-	//  INTERNAL : TO BE CALLED ONLY BY THE assignmentGroup - Allocating sub-slots
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new sub slot if the slot is not dead, yet. This method should only be called from
-	 * the assignment group instance to guarantee synchronization.
-	 * 
-	 * <b>NOTE:</b> This method is not synchronized and must only be called from
-	 *              the slot's assignment group.
-	 *
-	 * @param groupId The ID to identify tasks which can be deployed in this sub slot.
-	 * @return The new sub slot if the shared slot is still alive, otherwise null.
-	 */
-	SimpleSlot allocateSubSlot(AbstractID groupId) {
-		if (isAlive()) {
-			SimpleSlot slot = new SimpleSlot(
-				getOwner(),
-				getTaskManagerLocation(),
-				subSlots.size(),
-				getTaskManagerGateway(),
-				this,
-				groupId);
-			subSlots.add(slot);
-			return slot;
-		}
-		else {
-			return null;
-		}
-	}
-
-	/**
-	 * Creates a new sub slot if the slot is not dead, yet. This method should only be called from
-	 * the assignment group instance to guarantee synchronization.
-	 * 
-	 * NOTE: This method should only be called from the slot's assignment group.
-	 *
-	 * @param groupId The ID to identify tasks which can be deployed in this sub slot.
-	 * @return The new sub slot if the shared slot is still alive, otherwise null.
-	 */
-	SharedSlot allocateSharedSlot(AbstractID groupId){
-		if (isAlive()) {
-			SharedSlot slot = new SharedSlot(
-				getOwner(),
-				getTaskManagerLocation(),
-				subSlots.size(),
-				getTaskManagerGateway(),
-				assignmentGroup,
-				this,
-				groupId);
-			subSlots.add(slot);
-			return slot;
-		}
-		else {
-			return null;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  INTERNAL : TO BE CALLED ONLY BY THE assignmentGroup - releasing slots
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Disposes the given sub slot. This method is called by the child simple slot to tell this
-	 * shared slot to release it.
-	 *
-	 * The releasing process itself is done by the {@link SlotSharingGroupAssignment}, which controls
-	 * all the modifications in this shared slot.
-	 *
-	 * NOTE: This method must not modify the shared slot directly !!!
-	 *
-	 * @param slot The sub-slot which shall be removed from the shared slot.
-	 */
-	void releaseChild(SimpleSlot slot) {
-		assignmentGroup.releaseSimpleSlot(slot);
-	}
-	
-	/**
-	 * Removes the given slot from this shared slot. This method Should only be called
-	 * through this shared slot's {@link SlotSharingGroupAssignment}
-	 *
-	 * @param slot slot to be removed from the set of sub slots.
-	 * @return Number of remaining sub slots
-	 */
-	int removeDisposedChildSlot(Slot slot) {
-		if (!slot.isReleased() || !subSlots.remove(slot)) {
-			throw new IllegalArgumentException();
-		}
-		return subSlots.size();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	@Override
-	public String toString() {
-		return "Shared " + super.toString();
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
deleted file mode 100644
index 3363b15..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotContext;
-import org.apache.flink.runtime.jobmaster.SlotOwner;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.AbstractID;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-/**
- * A SimpleSlot represents a single slot on a TaskManager instance, or a slot within a shared slot.
- *
- * <p>If this slot is part of a {@link SharedSlot}, then the parent attribute will point to that shared slot.
- * If not, then the parent attribute is null.
- */
-public class SimpleSlot extends Slot implements LogicalSlot {
-
-	/** The updater used to atomically swap in the payload */
-	private static final AtomicReferenceFieldUpdater<SimpleSlot, Payload> PAYLOAD_UPDATER =
-			AtomicReferenceFieldUpdater.newUpdater(SimpleSlot.class, Payload.class, "payload");
-
-	// ------------------------------------------------------------------------
-
-	private final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
-
-	/** Id of the task being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */
-	private volatile Payload payload;
-
-	/** The locality attached to the slot, defining whether the slot was allocated at the desired location. */
-	private volatile Locality locality = Locality.UNCONSTRAINED;
-
-	// ------------------------------------------------------------------------
-	//  Old Constructors (legacy mode)
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new simple slot that stands alone and does not belong to shared slot.
-	 * 
-	 * @param owner The component from which this slot is allocated.
-	 * @param location The location info of the TaskManager where the slot was allocated from
-	 * @param slotNumber The number of the task slot on the instance.
-	 * @param taskManagerGateway The gateway to communicate with the TaskManager of this slot
-	 */
-	public SimpleSlot(
-		SlotOwner owner, TaskManagerLocation location, int slotNumber,
-		TaskManagerGateway taskManagerGateway) {
-		this(owner, location, slotNumber, taskManagerGateway, null, null);
-	}
-
-	/**
-	 * Creates a new simple slot that belongs to the given shared slot and
-	 * is identified by the given ID.
-	 *
-	 * @param owner The component from which this slot is allocated.
-	 * @param location The location info of the TaskManager where the slot was allocated from
-	 * @param slotNumber The number of the simple slot in its parent shared slot.
-	 * @param taskManagerGateway to communicate with the associated task manager.
-	 * @param parent The parent shared slot.
-	 * @param groupID The ID that identifies the group that the slot belongs to.
-	 */
-	public SimpleSlot(
-			SlotOwner owner,
-			TaskManagerLocation location,
-			int slotNumber,
-			TaskManagerGateway taskManagerGateway,
-			@Nullable SharedSlot parent,
-			@Nullable AbstractID groupID) {
-
-		super(
-			parent != null ?
-				parent.getSlotContext() :
-				new SimpleSlotContext(
-					NO_ALLOCATION_ID,
-					location,
-					slotNumber,
-					taskManagerGateway),
-			owner,
-			slotNumber,
-			parent,
-			groupID);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Constructors
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new simple slot that stands alone and does not belong to shared slot.
-	 *
-	 * @param slotContext The slot context of this simple slot
-	 * @param owner The component from which this slot is allocated.
-	 */
-	public SimpleSlot(SlotContext slotContext, SlotOwner owner, int slotNumber) {
-		this(slotContext, owner, slotNumber, null, null);
-	}
-
-	/**
-	 * Creates a new simple slot that belongs to the given shared slot and
-	 * is identified by the given ID..
-	 *
-	 * @param parent The parent shared slot.
-	 * @param owner The component from which this slot is allocated.
-	 * @param groupID The ID that identifies the group that the slot belongs to.
-	 */
-	public SimpleSlot(SharedSlot parent, SlotOwner owner, int slotNumber, AbstractID groupID) {
-		this(parent.getSlotContext(), owner, slotNumber, parent, groupID);
-	}
-	
-	/**
-	 * Creates a new simple slot that belongs to the given shared slot and
-	 * is identified by the given ID..
-	 *
-	 * @param slotContext The slot context of this simple slot
-	 * @param owner The component from which this slot is allocated.
-	 * @param slotNumber The number of the simple slot in its parent shared slot.
-	 * @param parent The parent shared slot.
-	 * @param groupID The ID that identifies the group that the slot belongs to.
-	 */
-	private SimpleSlot(
-			SlotContext slotContext,
-			SlotOwner owner,
-			int slotNumber,
-			@Nullable SharedSlot parent,
-			@Nullable AbstractID groupID) {
-		super(slotContext, owner, slotNumber, parent, groupID);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Properties
-	// ------------------------------------------------------------------------
-
-	@Override
-	public int getNumberLeaves() {
-		return 1;
-	}
-
-	/**
-	 * Atomically sets the executed vertex, if no vertex has been assigned to this slot so far.
-	 *
-	 * @param payload The vertex to assign to this slot.
-	 * @return True, if the vertex was assigned, false, otherwise.
-	 */
-	@Override
-	public boolean tryAssignPayload(Payload payload) {
-		Preconditions.checkNotNull(payload);
-
-		// check that we can actually run in this slot
-		if (isCanceled()) {
-			return false;
-		}
-
-		// atomically assign the vertex
-		if (!PAYLOAD_UPDATER.compareAndSet(this, null, payload)) {
-			return false;
-		}
-
-		// we need to do a double check that we were not cancelled in the meantime
-		if (isCanceled()) {
-			this.payload = null;
-			return false;
-		}
-
-		return true;
-	}
-
-	@Nullable
-	@Override
-	public Payload getPayload() {
-		return payload;
-	}
-
-	/**
-	 * Gets the locality information attached to this slot.
-	 * @return The locality attached to the slot.
-	 */
-	public Locality getLocality() {
-		return locality;
-	}
-
-	/**
-	 * Attached locality information to this slot.
-	 * @param locality The locality attached to the slot.
-	 */
-	public void setLocality(Locality locality) {
-		this.locality = locality;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Cancelling & Releasing
-	// ------------------------------------------------------------------------
-
-	@Override
-	public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
-		if (!isCanceled()) {
-			final CompletableFuture<?> terminationFuture;
-
-			if (payload != null) {
-				// trigger the failure of the slot payload
-				payload.fail(cause != null ? cause : new FlinkException("TaskManager was lost/killed: " + getTaskManagerLocation()));
-
-				// wait for the termination of the payload before releasing the slot
-				terminationFuture = payload.getTerminalStateFuture();
-			} else {
-				terminationFuture = CompletableFuture.completedFuture(null);
-			}
-
-			terminationFuture.whenComplete(
-				(Object ignored, Throwable throwable) -> {
-					// release directly (if we are directly allocated),
-					// otherwise release through the parent shared slot
-					if (getParent() == null) {
-						// we have to give back the slot to the owning instance
-						if (markCancelled()) {
-							try {
-								getOwner().returnLogicalSlot(this);
-								releaseFuture.complete(null);
-							} catch (Exception e) {
-								releaseFuture.completeExceptionally(e);
-							}
-						}
-					} else {
-						// we have to ask our parent to dispose us
-						getParent().releaseChild(this);
-
-						releaseFuture.complete(null);
-					}
-				});
-		}
-
-		return releaseFuture;
-	}
-
-	@Override
-	public int getPhysicalSlotNumber() {
-		return getRootSlotNumber();
-	}
-
-	@Override
-	public AllocationID getAllocationId() {
-		return getSlotContext().getAllocationId();
-	}
-
-	@Override
-	public SlotRequestId getSlotRequestId() {
-		return NO_SLOT_REQUEST_ID;
-	}
-
-	@Nullable
-	@Override
-	public SlotSharingGroupId getSlotSharingGroupId() {
-		return NO_SLOT_SHARING_GROUP_ID;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	@Override
-	public String toString() {
-		return "SimpleSlot " + super.toString();
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
deleted file mode 100644
index 9c1b627..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ /dev/null
@@ -1,381 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.SlotContext;
-import org.apache.flink.runtime.jobmaster.SlotOwner;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.AbstractID;
-
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Base class for slots that the Scheduler / ExecutionGraph take from the SlotPool and use to place
- * tasks to execute into. A slot corresponds to an AllocatedSlot (a slice of a TaskManager's resources),
- * plus additional fields to track what is currently executed in that slot, or if the slot is still
- * used or disposed (ExecutionGraph gave it back to the pool).
- *
- * <p>In the simplest case, a slot holds a single task ({@link SimpleSlot}). In the more complex
- * case, a slot is shared ({@link SharedSlot}) and contains a set of tasks. Shared slots may contain
- * other shared slots which in turn can hold simple slots. That way, a shared slot may define a tree
- * of slots that belong to it.
- */
-public abstract class Slot {
-
-	/** Updater for atomic state transitions */
-	private static final AtomicIntegerFieldUpdater<Slot> STATUS_UPDATER =
-			AtomicIntegerFieldUpdater.newUpdater(Slot.class, "status");
-
-	/** State where slot is fresh and alive. Tasks may be added to the slot. */
-	private static final int ALLOCATED_AND_ALIVE = 0;
-
-	/** State where the slot has been canceled and is in the process of being released */
-	private static final int CANCELLED = 1;
-
-	/** State where all tasks in this slot have been canceled and the slot been given back to the instance */
-	private static final int RELEASED = 2;
-
-	// temporary placeholder for Slots that are not constructed from an AllocatedSlot (by legacy code)
-	protected static final AllocationID NO_ALLOCATION_ID = new AllocationID(0L, 0L);
-	protected static final SlotRequestId NO_SLOT_REQUEST_ID = new SlotRequestId(0L, 0L);
-	protected static final SlotSharingGroupId NO_SLOT_SHARING_GROUP_ID = new SlotSharingGroupId(0L, 0L);
-
-	// ------------------------------------------------------------------------
-
-	/** Context of this logical slot. */
-	private final SlotContext slotContext;
-
-	/** The owner of this slot - the slot was taken from that owner and must be disposed to it */
-	private final SlotOwner owner;
-
-	/** The parent of this slot in the hierarchy, or null, if this is the parent */
-	@Nullable
-	private final SharedSlot parent;
-
-	/** The id of the group that this slot is allocated to. May be null. */
-	@Nullable
-	private final AbstractID groupID;
-
-	private final int slotNumber;
-
-	/** The state of the vertex, only atomically updated */
-	private volatile int status = ALLOCATED_AND_ALIVE;
-
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Base constructor for slots.
-	 * 
-	 * <p>This is the old way of constructing slots by the legacy code
-	 * 
-	 * @param owner The component from which this slot is allocated.
-	 * @param location The location info of the TaskManager where the slot was allocated from
-	 * @param slotNumber The number of this slot.
-	 * @param taskManagerGateway The actor gateway to communicate with the TaskManager
-	 * @param parent The parent slot that contains this slot. May be null, if this slot is the root.
-	 * @param groupID The ID that identifies the task group for which this slot is allocated. May be null
-	 *                if the slot does not belong to any task group.   
-	 */
-	protected Slot(
-			SlotOwner owner,
-			TaskManagerLocation location,
-			int slotNumber,
-			TaskManagerGateway taskManagerGateway,
-			@Nullable SharedSlot parent,
-			@Nullable AbstractID groupID) {
-
-		checkArgument(slotNumber >= 0);
-
-		// create a simple slot context
-		this.slotContext = new SimpleSlotContext(
-			NO_ALLOCATION_ID,
-			location,
-			slotNumber,
-			taskManagerGateway);
-
-		this.owner = checkNotNull(owner);
-		this.parent = parent; // may be null
-		this.groupID = groupID; // may be null
-		this.slotNumber = slotNumber;
-	}
-
-	/**
-	 * Base constructor for slots.
-	 *
-	 * @param slotContext The slot context of this slot.
-	 * @param owner The component from which this slot is allocated.
-	 * @param slotNumber The number of this slot.
-	 * @param parent The parent slot that contains this slot. May be null, if this slot is the root.
-	 * @param groupID The ID that identifies the task group for which this slot is allocated. May be null
-	 *                if the slot does not belong to any task group.   
-	 */
-	protected Slot(
-			SlotContext slotContext,
-			SlotOwner owner,
-			int slotNumber,
-			@Nullable SharedSlot parent,
-			@Nullable AbstractID groupID) {
-
-		this.slotContext = checkNotNull(slotContext);
-		this.owner = checkNotNull(owner);
-		this.parent = parent; // may be null
-		this.groupID = groupID; // may be null
-		this.slotNumber = slotNumber;
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Gets the allocated slot that this slot refers to.
-	 * 
-	 * @return This slot's allocated slot.
-	 */
-	public SlotContext getSlotContext() {
-		return slotContext;
-	}
-
-	/**
-	 * Gets the ID of the TaskManager that offers this slot.
-	 *
-	 * @return The ID of the TaskManager that offers this slot
-	 */
-	public ResourceID getTaskManagerID() {
-		return slotContext.getTaskManagerLocation().getResourceID();
-	}
-
-	/**
-	 * Gets the location info of the TaskManager that offers this slot.
-	 *
-	 * @return The location info of the TaskManager that offers this slot
-	 */
-	public TaskManagerLocation getTaskManagerLocation() {
-		return slotContext.getTaskManagerLocation();
-	}
-
-	/**
-	 * Gets the actor gateway that can be used to send messages to the TaskManager.
-	 *
-	 * <p>This method should be removed once the new interface-based RPC abstraction is in place
-	 *
-	 * @return The actor gateway that can be used to send messages to the TaskManager.
-	 */
-	public TaskManagerGateway getTaskManagerGateway() {
-		return slotContext.getTaskManagerGateway();
-	}
-
-	/**
-	 * Gets the owner of this slot. The owner is the component that the slot was created from
-	 * and to which it needs to be returned after the executed tasks are done.
-	 * 
-	 * @return The owner of this slot.
-	 */
-	public SlotOwner getOwner() {
-		return owner;
-	}
-
-	/**
-	 * Gets the number of the slot. For a simple slot, that is the number of the slot
-	 * on its instance. For a non-root slot, this returns the number of the slot in the
-	 * list amongst its siblings in the tree.
-	 *
-	 * @return The number of the slot on the instance or amongst its siblings that share the same slot.
-	 */
-	public int getSlotNumber() {
-		return slotNumber;
-	}
-
-	/**
-	 * Gets the number of the root slot. This code behaves equal to {@code getRoot().getSlotNumber()}.
-	 * If this slot is the root of the tree of shared slots, then this method returns the same
-	 * value as {@link #getSlotNumber()}.
-	 *
-	 * @return The slot number of the root slot.
-	 */
-	public int getRootSlotNumber() {
-		if (parent == null) {
-			return slotNumber;
-		} else {
-			return parent.getRootSlotNumber();
-		}
-	}
-
-	/**
-	 * Gets the ID that identifies the logical group to which this slot belongs:
-	 * <ul>
-	 *     <li>If the slot does not belong to any group in particular, this field is null.</li>
-	 *     <li>If this slot was allocated as a sub-slot of a
-	 *         {@link org.apache.flink.runtime.instance.SlotSharingGroupAssignment}, 
-	 *         then this ID will be the JobVertexID of the vertex whose task the slot
-	 *         holds in its shared slot.</li>
-	 *     <li>In case that the slot represents the shared slot of a co-location constraint, this ID will be the
-	 *         ID of the co-location constraint.</li>
-	 * </ul>
-	 * 
-	 * @return The ID identifying the logical group of slots.
-	 */
-	@Nullable
-	public AbstractID getGroupID() {
-		return groupID;
-	}
-
-	/**
-	 * Gets the parent slot of this slot. Returns null, if this slot has no parent.
-	 * 
-	 * @return The parent slot, or null, if no this slot has no parent.
-	 */
-	@Nullable
-	public SharedSlot getParent() {
-		return parent;
-	}
-
-	/**
-	 * Gets the root slot of the tree containing this slot. If this slot is the root,
-	 * the method returns this slot directly, otherwise it recursively goes to the parent until
-	 * it reaches the root.
-	 * 
-	 * @return The root slot of the tree containing this slot
-	 */
-	public Slot getRoot() {
-		if (parent == null) {
-			return this;
-		} else {
-			return parent.getRoot();
-		}
-	}
-
-	/**
-	 * Gets the number of simple slots that are at the leaves of the tree of slots.
-	 *
-	 * @return The number of simple slots at the leaves.
-	 */
-	public abstract int getNumberLeaves();
-
-	// --------------------------------------------------------------------------------------------
-	//  Status and life cycle
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Checks of the slot is still alive, i.e. in state {@link #ALLOCATED_AND_ALIVE}.
-	 *
-	 * @return True if the slot is alive, false otherwise.
-	 */
-	public boolean isAlive() {
-		return status == ALLOCATED_AND_ALIVE;
-	}
-
-	/**
-	 * Checks of the slot has been cancelled. Note that a released slot is also cancelled.
-	 *
-	 * @return True if the slot is cancelled or released, false otherwise.
-	 */
-	public boolean isCanceled() {
-		return status != ALLOCATED_AND_ALIVE;
-	}
-
-	/**
-	 * Checks of the slot has been released.
-	 *
-	 * @return True if the slot is released, false otherwise.
-	 */
-	public boolean isReleased() {
-		return status == RELEASED;
-	}
-
-	/**
-	 * Atomically marks the slot as cancelled, if it was alive before.
-	 *
-	 * @return True, if the state change was successful, false otherwise.
-	 */
-	final boolean markCancelled() {
-		return STATUS_UPDATER.compareAndSet(this, ALLOCATED_AND_ALIVE, CANCELLED);
-	}
-
-	/**
-	 * Atomically marks the slot as released, if it was cancelled before.
-	 *
-	 * @return True, if the state change was successful, false otherwise.
-	 */
-	final boolean markReleased() {
-		return STATUS_UPDATER.compareAndSet(this, CANCELLED, RELEASED);
-	}
-
-	/**
-	 * This method cancels and releases the slot and all its sub-slots.
-	 * 
-	 * After this method completed successfully, the slot will be in state "released", and the
-	 * {@link #isReleased()} method will return {@code true}.
-	 * 
-	 * If this slot is a simple slot, it will be returned to its instance. If it is a shared slot,
-	 * it will release all of its sub-slots and release itself.
-	 */
-	public abstract CompletableFuture<?> releaseSlot(@Nullable Throwable cause);
-
-
-	// --------------------------------------------------------------------------------------------
-	//  Utilities
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Slots must always has based on reference identity.
-	 */
-	@Override
-	public final int hashCode() {
-		return super.hashCode();
-	}
-
-	/**
-	 * Slots must always compare on referential equality.
-	 */
-	@Override
-	public final boolean equals(Object obj) {
-		return this == obj;
-	}
-
-	@Override
-	public String toString() {
-		return hierarchy() + " - " + getTaskManagerLocation() + " - " + getStateName(status);
-	}
-
-	protected String hierarchy() {
-		return (getParent() != null ? getParent().hierarchy() : "") + '(' + getSlotNumber() + ')';
-	}
-
-	private static String getStateName(int state) {
-		switch (state) {
-			case ALLOCATED_AND_ALIVE:
-				return "ALLOCATED/ALIVE";
-			case CANCELLED:
-				return "CANCELLED";
-			case RELEASED:
-				return "RELEASED";
-			default:
-				return "(unknown)";
-		}
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
deleted file mode 100644
index d16c332..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ /dev/null
@@ -1,664 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.AbstractID;
-import org.apache.flink.util.FlinkException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-
-/**
- * The SlotSharingGroupAssignment manages a set of shared slots, which are shared between
- * tasks of a {@link org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup}.
- * 
- * <p>The assignments shares tasks by allowing a shared slot to hold one vertex per
- * JobVertexID. For example, consider a program consisting of job vertices "source", "map",
- * "reduce", and "sink". If the slot sharing group spans all four job vertices, then
- * each shared slot can hold one parallel subtask of the source, the map, the reduce, and the
- * sink vertex. Each shared slot holds the actual subtasks in child slots, which are (at the leaf level),
- * the {@link SimpleSlot}s.</p>
- * 
- * <p>An exception are the co-location-constraints, that define that the i-th subtask of one
- * vertex needs to be scheduled strictly together with the i-th subtasks of the vertices
- * that share the co-location-constraint. To manage that, a co-location-constraint gets its
- * own shared slot inside the shared slots of a sharing group.</p>
- * 
- * <p>Consider a job set up like this:</p>
- * 
- * <pre>{@code
- * +-------------- Slot Sharing Group --------------+
- * |                                                |
- * |            +-- Co Location Group --+           |
- * |            |                       |           |
- * |  (source) ---> (head) ---> (tail) ---> (sink)  |
- * |            |                       |           |
- * |            +-----------------------+           |
- * +------------------------------------------------+
- * }</pre>
- * 
- * <p>The slot hierarchy in the slot sharing group will look like the following</p> 
- * 
- * <pre>
- *     Shared(0)(root)
- *        |
- *        +-- Simple(2)(sink)
- *        |
- *        +-- Shared(1)(co-location-group)
- *        |      |
- *        |      +-- Simple(0)(tail)
- *        |      +-- Simple(1)(head)
- *        |
- *        +-- Simple(0)(source)
- * </pre>
- */
-public class SlotSharingGroupAssignment {
-
-	private final static Logger LOG = LoggerFactory.getLogger(SlotSharingGroupAssignment.class);
-
-	/** The lock globally guards against concurrent modifications in the data structures */
-	private final Object lock = new Object();
-	
-	/** All slots currently allocated to this sharing group */
-	private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();
-
-	/** The slots available per vertex type (JobVertexId), keyed by TaskManager, to make them locatable */
-	private final Map<AbstractID, Map<ResourceID, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<>();
-
-
-	// --------------------------------------------------------------------------------------------
-	//  Accounting
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Gets the number of slots that are currently governed by this assignment group.
-	 * This refers to the slots allocated from an Instance,
-	 * and not the sub-slots given out as children of those shared slots.
-	 * 
-	 * @return The number of resource slots managed by this assignment group.
-	 */
-	public int getNumberOfSlots() {
-		return allSlots.size();
-	}
-
-	/**
-	 * Gets the number of shared slots into which the given group can place subtasks or 
-	 * nested task groups.
-	 * 
-	 * @param groupId The ID of the group.
-	 * @return The number of shared slots available to the given job vertex.
-	 */
-	public int getNumberOfAvailableSlotsForGroup(AbstractID groupId) {
-		synchronized (lock) {
-			Map<ResourceID, List<SharedSlot>> available = availableSlotsPerJid.get(groupId);
-
-			if (available != null) {
-				Set<SharedSlot> set = new HashSet<SharedSlot>();
-
-				for (List<SharedSlot> list : available.values()) {
-					for (SharedSlot slot : list) {
-						set.add(slot);
-					}
-				}
-
-				return set.size();
-			}
-			else {
-				// if no entry exists for a JobVertexID so far, then the vertex with that ID can
-				// add a subtask into each shared slot of this group. Consequently, all
-				// of them are available for that JobVertexID.
-				return allSlots.size();
-			}
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Slot allocation
-	// ------------------------------------------------------------------------
-
-	public SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot sharedSlot, Locality locality, JobVertexID groupId) {
-		return addSharedSlotAndAllocateSubSlot(sharedSlot, locality, groupId, null);
-	}
-
-	public SimpleSlot addSharedSlotAndAllocateSubSlot(
-			SharedSlot sharedSlot, Locality locality, CoLocationConstraint constraint)
-	{
-		return addSharedSlotAndAllocateSubSlot(sharedSlot, locality, null, constraint);
-	}
-
-	private SimpleSlot addSharedSlotAndAllocateSubSlot(
-			SharedSlot sharedSlot, Locality locality, JobVertexID groupId, CoLocationConstraint constraint) {
-
-		// sanity checks
-		if (!sharedSlot.isRootAndEmpty()) {
-			throw new IllegalArgumentException("The given slot is not an empty root slot.");
-		}
-
-		final ResourceID location = sharedSlot.getTaskManagerID();
-
-		synchronized (lock) {
-			// early out in case that the slot died (instance disappeared)
-			if (!sharedSlot.isAlive()) {
-				return null;
-			}
-			
-			// add to the total bookkeeping
-			if (!allSlots.add(sharedSlot)) {
-				throw new IllegalArgumentException("Slot was already contained in the assignment group");
-			}
-			
-			SimpleSlot subSlot;
-			AbstractID groupIdForMap;
-					
-			if (constraint == null) {
-				// allocate us a sub slot to return
-				subSlot = sharedSlot.allocateSubSlot(groupId);
-				groupIdForMap = groupId;
-			}
-			else {
-				// sanity check
-				if (constraint.isAssignedAndAlive()) {
-					throw new IllegalStateException(
-							"Trying to add a shared slot to a co-location constraint that has a life slot.");
-				}
-				
-				// we need a co-location slot --> a SimpleSlot nested in a SharedSlot to
-				//                                host other co-located tasks
-				SharedSlot constraintGroupSlot = sharedSlot.allocateSharedSlot(constraint.getGroupId());
-				groupIdForMap = constraint.getGroupId();
-				
-				if (constraintGroupSlot != null) {
-					// the sub-slots in the co-location constraint slot have no own group IDs
-					subSlot = constraintGroupSlot.allocateSubSlot(null);
-					if (subSlot != null) {
-						// all went well, we can give the constraint its slot
-						constraint.setSharedSlot(constraintGroupSlot);
-						
-						// NOTE: Do not lock the location constraint, because we don't yet know whether we will
-						// take the slot here
-					}
-					else {
-						// if we could not create a sub slot, release the co-location slot
-						// note that this does implicitly release the slot we have just added
-						// as well, because we release its last child slot. That is expected
-						// and desired.
-						constraintGroupSlot.releaseSlot(new FlinkException("Could not create a sub slot in this shared slot."));
-					}
-				}
-				else {
-					// this should not happen, as we are under the lock that also
-					// guards slot disposals. Keep the check to be on the safe side
-					subSlot = null;
-				}
-			}
-			
-			if (subSlot != null) {
-				// preserve the locality information
-				subSlot.setLocality(locality);
-				
-				// let the other groups know that this slot exists and that they
-				// can place a task into this slot.
-				boolean entryForNewJidExists = false;
-				
-				for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
-					// there is already an entry for this groupID
-					if (entry.getKey().equals(groupIdForMap)) {
-						entryForNewJidExists = true;
-						continue;
-					}
-
-					Map<ResourceID, List<SharedSlot>> available = entry.getValue();
-					putIntoMultiMap(available, location, sharedSlot);
-				}
-
-				// make sure an empty entry exists for this group, if no other entry exists
-				if (!entryForNewJidExists) {
-					availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<ResourceID, List<SharedSlot>>());
-				}
-
-				return subSlot;
-			}
-			else {
-				// if sharedSlot is releases, abort.
-				// This should be a rare case, since this method is called with a fresh slot.
-				return null;
-			}
-		}
-		// end synchronized (lock)
-	}
-
-	/**
-	 * Gets a slot suitable for the given task vertex. This method will prefer slots that are local
-	 * (with respect to {@link ExecutionVertex#getPreferredLocationsBasedOnInputs()}), but will return non local
-	 * slots if no local slot is available. The method returns null, when this sharing group has
-	 * no slot available for the given JobVertexID.
-	 *
-	 * @param vertexID the vertex id
-	 * @param locationPreferences location preferences
-	 *
-	 * @return A slot to execute the given ExecutionVertex in, or null, if none is available.
-	 */
-	public SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) {
-		synchronized (lock) {
-			Tuple2<SharedSlot, Locality> p = getSharedSlotForTask(vertexID, locationPreferences, false);
-
-			if (p != null) {
-				SharedSlot ss = p.f0;
-				SimpleSlot slot = ss.allocateSubSlot(vertexID);
-				slot.setLocality(p.f1);
-				return slot;
-			}
-			else {
-				return null;
-			}
-		}
-	}
-
-	/**
-	 * Gets a slot for a task that has a co-location constraint. This method tries to grab
-	 * a slot form the location-constraint's shared slot. If that slot has not been initialized,
-	 * then the method tries to grab another slot that is available for the location-constraint-group.
-	 * 
-	 * <p>In cases where the co-location constraint has not yet been initialized with a slot,
-	 * or where that slot has been disposed in the meantime, this method tries to allocate a shared
-	 * slot for the co-location constraint (inside on of the other available slots).</p>
-	 * 
-	 * <p>If a suitable shared slot is available, this method allocates a simple slot within that
-	 * shared slot and returns it. If no suitable shared slot could be found, this method
-	 * returns null.</p>
-	 * 
-	 * @param constraint The co-location constraint for the placement of the execution vertex.
-	 * @param locationPreferences location preferences
-	 * 
-	 * @return A simple slot allocate within a suitable shared slot, or {@code null}, if no suitable
-	 *         shared slot is available.
-	 */
-	public SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable<TaskManagerLocation> locationPreferences) {
-		synchronized (lock) {
-			if (constraint.isAssignedAndAlive()) {
-				// the shared slot of the co-location group is initialized and set we allocate a sub-slot
-				final SharedSlot shared = constraint.getSharedSlot();
-				SimpleSlot subslot = shared.allocateSubSlot(null);
-				subslot.setLocality(Locality.LOCAL);
-				return subslot;
-			}
-			else if (constraint.isAssigned()) {
-				// we had an assignment before.
-				
-				SharedSlot previous = constraint.getSharedSlot();
-				if (previous == null) {
-					throw new IllegalStateException("Bug: Found assigned co-location constraint without a slot.");
-				}
-
-				TaskManagerLocation location = previous.getTaskManagerLocation();
-				Tuple2<SharedSlot, Locality> p = getSharedSlotForTask(
-						constraint.getGroupId(), Collections.singleton(location), true);
-
-				if (p == null) {
-					return null;
-				}
-				else {
-					SharedSlot newSharedSlot = p.f0;
-
-					// allocate the co-location group slot inside the shared slot
-					SharedSlot constraintGroupSlot = newSharedSlot.allocateSharedSlot(constraint.getGroupId());
-					if (constraintGroupSlot != null) {
-						constraint.setSharedSlot(constraintGroupSlot);
-
-						// the sub slots in the co location constraint slot have no group that they belong to
-						// (other than the co-location-constraint slot)
-						SimpleSlot subSlot = constraintGroupSlot.allocateSubSlot(null);
-						subSlot.setLocality(Locality.LOCAL);
-						return subSlot;
-					}
-					else {
-						// could not allocate the co-location-constraint shared slot
-						return null;
-					}
-				}
-			}
-			else {
-				// the location constraint has not been associated with a shared slot, yet.
-				// grab a new slot and initialize the constraint with that one.
-				// preferred locations are defined by the vertex
-				Tuple2<SharedSlot, Locality> p =
-						getSharedSlotForTask(constraint.getGroupId(), locationPreferences, false);
-				if (p == null) {
-					// could not get a shared slot for this co-location-group
-					return null;
-				}
-				else {
-					final SharedSlot availableShared = p.f0;
-					final Locality l = p.f1;
-
-					// allocate the co-location group slot inside the shared slot
-					SharedSlot constraintGroupSlot = availableShared.allocateSharedSlot(constraint.getGroupId());
-					
-					// IMPORTANT: We do not lock the location, yet, since we cannot be sure that the
-					//            caller really sticks with the slot we picked!
-					constraint.setSharedSlot(constraintGroupSlot);
-					
-					// the sub slots in the co location constraint slot have no group that they belong to
-					// (other than the co-location-constraint slot)
-					SimpleSlot sub = constraintGroupSlot.allocateSubSlot(null);
-					sub.setLocality(l);
-					return sub;
-				}
-			}
-		}
-	}
-
-
-	public Tuple2<SharedSlot, Locality> getSharedSlotForTask(
-			AbstractID groupId,
-			Iterable<TaskManagerLocation> preferredLocations,
-			boolean localOnly) {
-		// check if there is anything at all in this group assignment
-		if (allSlots.isEmpty()) {
-			return null;
-		}
-
-		// get the available slots for the group
-		Map<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
-		
-		if (slotsForGroup == null) {
-			// we have a new group, so all slots are available
-			slotsForGroup = new LinkedHashMap<>();
-			availableSlotsPerJid.put(groupId, slotsForGroup);
-
-			for (SharedSlot availableSlot : allSlots) {
-				putIntoMultiMap(slotsForGroup, availableSlot.getTaskManagerID(), availableSlot);
-			}
-		}
-		else if (slotsForGroup.isEmpty()) {
-			// the group exists, but nothing is available for that group
-			return null;
-		}
-
-		// check whether we can schedule the task to a preferred location
-		boolean didNotGetPreferred = false;
-
-		if (preferredLocations != null) {
-			for (TaskManagerLocation location : preferredLocations) {
-
-				// set the flag that we failed a preferred location. If one will be found,
-				// we return early anyways and skip the flag evaluation
-				didNotGetPreferred = true;
-
-				SharedSlot slot = removeFromMultiMap(slotsForGroup, location.getResourceID());
-				if (slot != null && slot.isAlive()) {
-					return new Tuple2<>(slot, Locality.LOCAL);
-				}
-			}
-		}
-
-		// if we want only local assignments, exit now with a "not found" result
-		if (didNotGetPreferred && localOnly) {
-			return null;
-		}
-
-		Locality locality = didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED;
-
-		// schedule the task to any available location
-		SharedSlot slot;
-		while ((slot = pollFromMultiMap(slotsForGroup)) != null) {
-			if (slot.isAlive()) {
-				return new Tuple2<>(slot, locality);
-			}
-		}
-		
-		// nothing available after all, all slots were dead
-		return null;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Slot releasing
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Releases the simple slot from the assignment group.
-	 * 
-	 * @param simpleSlot The SimpleSlot to be released
-	 */
-	void releaseSimpleSlot(SimpleSlot simpleSlot) {
-		synchronized (lock) {
-			// try to transition to the CANCELED state. That state marks
-			// that the releasing is in progress
-			if (simpleSlot.markCancelled()) {
-
-				// sanity checks
-				if (simpleSlot.isAlive()) {
-					throw new IllegalStateException("slot is still alive");
-				}
-
-				// check whether the slot is already released
-				if (simpleSlot.markReleased()) {
-					LOG.debug("Release simple slot {}.", simpleSlot);
-
-					AbstractID groupID = simpleSlot.getGroupID();
-					SharedSlot parent = simpleSlot.getParent();
-
-					// if we have a group ID, then our parent slot is tracked here
-					if (groupID != null && !allSlots.contains(parent)) {
-						throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before.");
-					}
-
-					int parentRemaining = parent.removeDisposedChildSlot(simpleSlot);
-
-					if (parentRemaining > 0) {
-						// the parent shared slot is still alive. make sure we make it
-						// available again to the group of the just released slot
-
-						if (groupID != null) {
-							// if we have a group ID, then our parent becomes available
-							// for that group again. otherwise, the slot is part of a
-							// co-location group and nothing becomes immediately available
-
-							Map<ResourceID, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupID);
-
-							// sanity check
-							if (slotsForJid == null) {
-								throw new IllegalStateException("Trying to return a slot for group " + groupID +
-										" when available slots indicated that all slots were available.");
-							}
-
-							putIntoMultiMap(slotsForJid, parent.getTaskManagerID(), parent);
-						}
-					} else {
-						// the parent shared slot is now empty and can be released
-						parent.markCancelled();
-						internalDisposeEmptySharedSlot(parent);
-					}
-				}
-			}
-		}
-	}
-
-	/**
-	 * Called from {@link org.apache.flink.runtime.instance.SharedSlot#releaseSlot(Throwable)}.
-	 * 
-	 * @param sharedSlot The slot to be released.
-	 */
-	void releaseSharedSlot(SharedSlot sharedSlot) {
-		synchronized (lock) {
-			if (sharedSlot.markCancelled()) {
-				// we are releasing this slot
-				
-				if (sharedSlot.hasChildren()) {
-					final FlinkException cause = new FlinkException("Releasing shared slot parent.");
-					// by simply releasing all children, we should eventually release this slot.
-					Set<Slot> children = sharedSlot.getSubSlots();
-					while (children.size() > 0) {
-						children.iterator().next().releaseSlot(cause);
-					}
-				}
-				else {
-					// if there are no children that trigger the release, we trigger it directly
-					internalDisposeEmptySharedSlot(sharedSlot);
-				}
-			}
-		}
-	}
-
-	/**
-	 * 
-	 * <p><b>NOTE: This method must be called from within a scope that holds the lock.</b></p>
-	 */
-	private void internalDisposeEmptySharedSlot(SharedSlot sharedSlot) {
-		// sanity check
-		if (sharedSlot.isAlive() || !sharedSlot.getSubSlots().isEmpty()) {
-			throw new IllegalArgumentException();
-		}
-		
-		final SharedSlot parent = sharedSlot.getParent();
-		final AbstractID groupID = sharedSlot.getGroupID();
-		
-		// 1) If we do not have a parent, we are a root slot.
-		// 2) If we are not a root slot, we are a slot with a groupID and our parent
-		//    becomes available for that group
-		
-		if (parent == null) {
-			// root slot, return to the instance.
-			sharedSlot.getOwner().returnLogicalSlot(sharedSlot);
-
-			// also, make sure we remove this slot from everywhere
-			allSlots.remove(sharedSlot);
-			removeSlotFromAllEntries(availableSlotsPerJid, sharedSlot);
-		}
-		else if (groupID != null) {
-			// we remove ourselves from our parent slot
-
-			if (sharedSlot.markReleased()) {
-				LOG.debug("Internally dispose empty shared slot {}.", sharedSlot);
-
-				int parentRemaining = parent.removeDisposedChildSlot(sharedSlot);
-				
-				if (parentRemaining > 0) {
-					// the parent becomes available for the group again
-					Map<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupID);
-
-					// sanity check
-					if (slotsForGroup == null) {
-						throw new IllegalStateException("Trying to return a slot for group " + groupID +
-								" when available slots indicated that all slots were available.");
-					}
-
-					putIntoMultiMap(slotsForGroup, parent.getTaskManagerID(), parent);
-					
-				}
-				else {
-					// this was the last child of the parent. release the parent.
-					parent.markCancelled();
-					internalDisposeEmptySharedSlot(parent);
-				}
-			}
-		}
-		else {
-			throw new IllegalStateException(
-					"Found a shared slot that is neither a root slot, nor associated with a vertex group.");
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private static void putIntoMultiMap(Map<ResourceID, List<SharedSlot>> map, ResourceID location, SharedSlot slot) {
-		List<SharedSlot> slotsForInstance = map.get(location);
-		if (slotsForInstance == null) {
-			slotsForInstance = new ArrayList<SharedSlot>();
-			map.put(location, slotsForInstance);
-		}
-		slotsForInstance.add(slot);
-	}
-	
-	private static SharedSlot removeFromMultiMap(Map<ResourceID, List<SharedSlot>> map, ResourceID location) {
-		List<SharedSlot> slotsForLocation = map.get(location);
-		
-		if (slotsForLocation == null) {
-			return null;
-		}
-		else {
-			SharedSlot slot = slotsForLocation.remove(slotsForLocation.size() - 1);
-			if (slotsForLocation.isEmpty()) {
-				map.remove(location);
-			}
-			
-			return slot;
-		}
-	}
-	
-	private static SharedSlot pollFromMultiMap(Map<ResourceID, List<SharedSlot>> map) {
-		Iterator<Map.Entry<ResourceID, List<SharedSlot>>> iter = map.entrySet().iterator();
-		
-		while (iter.hasNext()) {
-			List<SharedSlot> slots = iter.next().getValue();
-			
-			if (slots.isEmpty()) {
-				iter.remove();
-			}
-			else if (slots.size() == 1) {
-				SharedSlot slot = slots.remove(0);
-				iter.remove();
-				return slot;
-			}
-			else {
-				return slots.remove(slots.size() - 1);
-			}
-		}
-		
-		return null;
-	}
-	
-	private static void removeSlotFromAllEntries(
-			Map<AbstractID, Map<ResourceID, List<SharedSlot>>> availableSlots, SharedSlot slot)
-	{
-		final ResourceID taskManagerId = slot.getTaskManagerID();
-		
-		for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> entry : availableSlots.entrySet()) {
-			Map<ResourceID, List<SharedSlot>> map = entry.getValue();
-
-			List<SharedSlot> list = map.get(taskManagerId);
-			if (list != null) {
-				list.remove(slot);
-				if (list.isEmpty()) {
-					map.remove(taskManagerId);
-				}
-			}
-		}
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index 0e9f585..df8e9f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -18,18 +18,14 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import org.apache.flink.runtime.instance.SharedSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
-import java.util.Objects;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -45,8 +41,6 @@ public class CoLocationConstraint {
 
 	private final CoLocationGroup group;
 
-	private volatile SharedSlot sharedSlot;
-
 	private volatile TaskManagerLocation lockedLocation;
 
 	private volatile SlotRequestId slotRequestId;
@@ -62,15 +56,6 @@ public class CoLocationConstraint {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Gets the shared slot into which this constraint's tasks are places.
-	 * 
-	 * @return The shared slot into which this constraint's tasks are places.
-	 */
-	public SharedSlot getSharedSlot() {
-		return sharedSlot;
-	}
-
-	/**
 	 * Gets the ID that identifies the co-location group.
 	 * 
 	 * @return The ID that identifies the co-location group.
@@ -81,9 +66,8 @@ public class CoLocationConstraint {
 
 	/**
 	 * Checks whether the location of this constraint has been assigned.
-	 * The location is assigned once a slot has been set, via the
-	 * {@link #setSharedSlot(org.apache.flink.runtime.instance.SharedSlot)} method,
-	 * and the location is locked via the {@link #lockLocation()} method.
+	 * The location is locked via the {@link #lockLocation(TaskManagerLocation)}
+	 * method.
 	 * 
 	 * @return True if the location has been assigned, false otherwise.
 	 */
@@ -92,23 +76,9 @@ public class CoLocationConstraint {
 	}
 
 	/**
-	 * Checks whether the location of this constraint has been assigned
-	 * (as defined in the {@link #isAssigned()} method, and the current
-	 * shared slot is alive.
-	 *
-	 * @return True if the location has been assigned and the shared slot is alive,
-	 *         false otherwise.
-	 * @deprecated Should only be called by legacy code
-	 */
-	@Deprecated
-	public boolean isAssignedAndAlive() {
-		return lockedLocation != null && sharedSlot != null && sharedSlot.isAlive();
-	}
-
-	/**
 	 * Gets the location assigned to this slot. This method only succeeds after
-	 * the location has been locked via the {@link #lockLocation()} method and
-	 * {@link #isAssigned()} returns true.
+	 * the location has been locked via the {@link #lockLocation(TaskManagerLocation)}
+	 * method and {@link #isAssigned()} returns true.
 	 *
 	 * @return The instance describing the location for the tasks of this constraint.
 	 * @throws IllegalStateException Thrown if the location has not been assigned, yet.
@@ -126,51 +96,6 @@ public class CoLocationConstraint {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Assigns a new shared slot to this co-location constraint. The shared slot
-	 * will hold the subtasks that are executed under this co-location constraint.
-	 * If the constraint's location is assigned, then this slot needs to be from
-	 * the same location (instance) as the one assigned to this constraint.
-	 * 
-	 * <p>If the constraint already has a slot, the current one will be released.</p>
-	 *
-	 * @param newSlot The new shared slot to assign to this constraint.
-	 * @throws IllegalArgumentException If the constraint's location has been assigned and
-	 *                                  the new slot is from a different location.
-	 */
-	public void setSharedSlot(SharedSlot newSlot) {
-		checkNotNull(newSlot);
-
-		if (this.sharedSlot == null) {
-			this.sharedSlot = newSlot;
-		}
-		else if (newSlot != this.sharedSlot){
-			if (lockedLocation != null && !Objects.equals(lockedLocation, newSlot.getTaskManagerLocation())) {
-				throw new IllegalArgumentException(
-						"Cannot assign different location to a constraint whose location is locked.");
-			}
-			if (this.sharedSlot.isAlive()) {
-				this.sharedSlot.releaseSlot(new FlinkException("Setting new shared slot for co-location constraint."));
-			}
-
-			this.sharedSlot = newSlot;
-		}
-	}
-
-	/**
-	 * Locks the location of this slot. The location can be locked only once
-	 * and only after a shared slot has been assigned.
-	 * 
-	 * @throws IllegalStateException Thrown, if the location is already locked,
-	 *                               or is no slot has been set, yet.
-	 */
-	public void lockLocation() throws IllegalStateException {
-		checkState(lockedLocation == null, "Location is already locked");
-		checkState(sharedSlot != null, "Cannot lock location without a slot.");
-
-		lockedLocation = sharedSlot.getTaskManagerLocation();
-	}
-
-	/**
 	 * Locks the location of this slot. The location can be locked only once
 	 * and only after a shared slot has been assigned.
 	 *
@@ -209,9 +134,13 @@ public class CoLocationConstraint {
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
-		return "CoLocation constraint id " + getGroupId() + " shared slot " + sharedSlot;
+		return "CoLocationConstraint{" +
+			"group=" + group +
+			", lockedLocation=" + lockedLocation +
+			", slotRequestId=" + slotRequestId +
+			'}';
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
index ef8bd67..c8bc9b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
@@ -119,12 +119,6 @@ public class CoLocationGroup implements java.io.Serializable {
 	 * executed any more.</p>
 	 */
 	public void resetConstraints() {
-		for (CoLocationConstraint c : this.constraints) {
-			if (c.isAssignedAndAlive()) {
-				throw new IllegalStateException(
-						"Cannot reset co-location group: some constraints still have live tasks");
-			}
-		}
 		this.constraints.clear();
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
index 86be9d4..c93456e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -34,13 +33,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 public class SlotSharingGroup implements java.io.Serializable {
 	
 	private static final long serialVersionUID = 1L;
-	
 
 	private final Set<JobVertexID> ids = new TreeSet<JobVertexID>();
-	
-	/** Mapping of tasks to subslots. This field is only needed inside the JobManager, and is not RPCed. */
-	private transient SlotSharingGroupAssignment taskAssignment;
-
 	private final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
 	
 	public SlotSharingGroup() {}
@@ -68,23 +62,6 @@ public class SlotSharingGroup implements java.io.Serializable {
 	public SlotSharingGroupId getSlotSharingGroupId() {
 		return slotSharingGroupId;
 	}
-
-	public SlotSharingGroupAssignment getTaskAssignment() {
-		if (this.taskAssignment == null) {
-			this.taskAssignment = new SlotSharingGroupAssignment();
-		}
-		
-		return this.taskAssignment;
-	}
-	
-	public void clearTaskAssignment() {
-		if (this.taskAssignment != null) {
-			if (this.taskAssignment.getNumberOfSlots() > 0) {
-				throw new IllegalStateException("SlotSharingGroup cannot clear task assignment, group still has allocated resources.");
-			}
-		}
-		this.taskAssignment = null;
-	}
 	
 	// ------------------------------------------------------------------------
 	//  Utilities
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 13f74f4..ebef29e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -56,7 +55,6 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
@@ -105,7 +103,6 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
 
 /**
  * Tests for {@link ExecutionGraph} deployment.
@@ -630,13 +627,11 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 
 		final TaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
 
-		final SimpleSlot sourceSlot1 = createSlot(localTaskManagerLocation, 0);
+		final LogicalSlot sourceSlot1 = createSlot(localTaskManagerLocation, 0);
+		final LogicalSlot sourceSlot2 = createSlot(localTaskManagerLocation, 1);
 
-		final SimpleSlot sourceSlot2 = createSlot(localTaskManagerLocation, 1);
-
-		final SimpleSlot sinkSlot1 = createSlot(localTaskManagerLocation, 0);
-
-		final SimpleSlot sinkSlot2 = createSlot(localTaskManagerLocation, 1);
+		final LogicalSlot sinkSlot1 = createSlot(localTaskManagerLocation, 0);
+		final LogicalSlot sinkSlot2 = createSlot(localTaskManagerLocation, 1);
 
 		slotFutures.get(sourceVertexId)[0].complete(sourceSlot1);
 		slotFutures.get(sourceVertexId)[1].complete(sourceSlot2);
@@ -751,12 +746,11 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 		}
 	}
 
-	private SimpleSlot createSlot(TaskManagerLocation taskManagerLocation, int index) {
-		return new SimpleSlot(
-			mock(SlotOwner.class),
-			taskManagerLocation,
-			index,
-			new SimpleAckingTaskManagerGateway());
+	private LogicalSlot createSlot(TaskManagerLocation taskManagerLocation, int index) {
+		return new TestingLogicalSlotBuilder()
+			.setTaskManagerLocation(taskManagerLocation)
+			.setSlotNumber(index)
+			.createTestingLogicalSlot();
 	}
 
 	private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 3850a70..802dff0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -76,8 +75,8 @@ import java.util.concurrent.TimeoutException;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for the scheduling of the execution graph. This tests that
@@ -139,8 +138,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final InteractionsCountingTaskManagerGateway gatewaySource = createTaskManager();
 		final InteractionsCountingTaskManagerGateway gatewayTarget = createTaskManager();
 
-		final SimpleSlot sourceSlot = createSlot(gatewaySource, jobId);
-		final SimpleSlot targetSlot = createSlot(gatewayTarget, jobId);
+		final LogicalSlot sourceSlot = createTestingLogicalSlot(gatewaySource);
+		final LogicalSlot targetSlot = createTestingLogicalSlot(gatewayTarget);
 
 		eg.scheduleForExecution();
 
@@ -164,6 +163,12 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		assertEquals(JobStatus.RUNNING, eg.getState());
 	}
 
+	private TestingLogicalSlot createTestingLogicalSlot(InteractionsCountingTaskManagerGateway gatewaySource) {
+		return new TestingLogicalSlotBuilder()
+			.setTaskManagerGateway(gatewaySource)
+			.createTestingLogicalSlot();
+	}
+
 	/**
 	 * This test verifies that before deploying a pipelined connected component, the
 	 * full set of slots is available, and that not some tasks are deployed, and later the
@@ -203,15 +208,15 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final InteractionsCountingTaskManagerGateway[] sourceTaskManagers = new InteractionsCountingTaskManagerGateway[parallelism];
 		final InteractionsCountingTaskManagerGateway[] targetTaskManagers = new InteractionsCountingTaskManagerGateway[parallelism];
 
-		final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
-		final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
+		final LogicalSlot[] sourceSlots = new LogicalSlot[parallelism];
+		final LogicalSlot[] targetSlots = new LogicalSlot[parallelism];
 
 		for (int i = 0; i < parallelism; i++) {
 			sourceTaskManagers[i] = createTaskManager();
 			targetTaskManagers[i] = createTaskManager();
 
-			sourceSlots[i] = createSlot(sourceTaskManagers[i], jobId);
-			targetSlots[i] = createSlot(targetTaskManagers[i], jobId);
+			sourceSlots[i] = createTestingLogicalSlot(sourceTaskManagers[i]);
+			targetSlots[i] = createTestingLogicalSlot(targetTaskManagers[i]);
 
 			sourceFutures[i] = new CompletableFuture<>();
 			targetFutures[i] = new CompletableFuture<>();
@@ -298,8 +303,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		slotOwner.setReturnAllocatedSlotConsumer(
 			(LogicalSlot logicalSlot) -> returnedSlots.offer(logicalSlot.getAllocationId()));
 
-		final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
-		final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
+		final LogicalSlot[] sourceSlots = new LogicalSlot[parallelism];
+		final LogicalSlot[] targetSlots = new LogicalSlot[parallelism];
 
 		@SuppressWarnings({"unchecked", "rawtypes"})
 		final CompletableFuture<LogicalSlot>[] sourceFutures = new CompletableFuture[parallelism];
@@ -307,8 +312,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final CompletableFuture<LogicalSlot>[] targetFutures = new CompletableFuture[parallelism];
 
 		for (int i = 0; i < parallelism; i++) {
-			sourceSlots[i] = createSlot(taskManager, jobId, slotOwner);
-			targetSlots[i] = createSlot(taskManager, jobId, slotOwner);
+			sourceSlots[i] = createSingleLogicalSlot(slotOwner, taskManager, new SlotRequestId());
+			targetSlots[i] = createSingleLogicalSlot(slotOwner, taskManager, new SlotRequestId());
 
 			sourceFutures[i] = new CompletableFuture<>();
 			targetFutures[i] = new CompletableFuture<>();
@@ -348,8 +353,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		// all completed futures must have been returns
 		for (int i = 0; i < parallelism; i += 2) {
-			assertTrue(sourceSlots[i].isCanceled());
-			assertTrue(targetSlots[i].isCanceled());
+			assertFalse(sourceSlots[i].isAlive());
+			assertFalse(targetSlots[i].isAlive());
 		}
 	}
 
@@ -379,12 +384,12 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 			(LogicalSlot logicalSlot) -> returnedSlots.offer(logicalSlot.getAllocationId()));
 
 		final InteractionsCountingTaskManagerGateway taskManager = createTaskManager();
-		final SimpleSlot[] slots = new SimpleSlot[parallelism];
+		final LogicalSlot[] slots = new LogicalSlot[parallelism];
 		@SuppressWarnings({"unchecked", "rawtypes"})
 		final CompletableFuture<LogicalSlot>[] slotFutures = new CompletableFuture[parallelism];
 
 		for (int i = 0; i < parallelism; i++) {
-			slots[i] = createSlot(taskManager, jobId, slotOwner);
+			slots[i] = createSingleLogicalSlot(slotOwner, taskManager, new SlotRequestId());
 			slotFutures[i] = new CompletableFuture<>();
 		}
 
@@ -478,7 +483,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
 
-		final SimpleSlot slot = createSlot(new SimpleAckingTaskManagerGateway(), jobGraph.getJobID(), new DummySlotOwner());
+		final LogicalSlot slot = createSingleLogicalSlot(new DummySlotOwner(), new SimpleAckingTaskManagerGateway(), new SlotRequestId());
 		slotProvider.addSlot(jobVertex.getID(), 0, CompletableFuture.completedFuture(slot));
 
 		final CompletableFuture<LogicalSlot> slotFuture = new CompletableFuture<>();
@@ -592,23 +597,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 			NoOpPartitionTracker.INSTANCE);
 	}
 
-	private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId) {
-		return createSlot(taskManager, jobId, new TestingSlotOwner());
-	}
-
-	private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId, SlotOwner slotOwner) {
-		TaskManagerLocation location = new TaskManagerLocation(
-				ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345);
-
-		SimpleSlotContext slot = new SimpleSlotContext(
-			new AllocationID(),
-			location,
-			0,
-			taskManager);
-
-		return new SimpleSlot(slot, slotOwner, 0);
-	}
-
 	@Nonnull
 	static SingleLogicalSlot createSingleLogicalSlot(SlotOwner slotOwner, TaskManagerGateway taskManagerGateway, SlotRequestId slotRequestId) {
 		TaskManagerLocation location = new TaskManagerLocation(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
index ab4048d..21bd002 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.PartitionTracker;
@@ -51,6 +50,7 @@ import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
@@ -219,11 +219,12 @@ public class ExecutionPartitionLifecycleTest extends TestLogger {
 		final SlotProvider slotProvider = new SlotProvider() {
 			@Override
 			public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) {
-				return CompletableFuture.completedFuture(new SimpleSlot(
-					new SingleSlotTestingSlotOwner(),
-					taskManagerLocation,
-					0,
-					taskManagerGateway));
+				return CompletableFuture.completedFuture(
+					new TestingLogicalSlotBuilder()
+						.setTaskManagerLocation(taskManagerLocation)
+						.setTaskManagerGateway(taskManagerGateway)
+						.setSlotOwner(new SingleSlotTestingSlotOwner())
+						.createTestingLogicalSlot());
 			}
 
 			@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 6df8366..3be8657 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
@@ -33,6 +32,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
@@ -106,11 +106,7 @@ public class ExecutionTest extends TestLogger {
 
 		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
-		final SimpleSlot slot = new SimpleSlot(
-			slotOwner,
-			new LocalTaskManagerLocation(),
-			0,
-			new SimpleAckingTaskManagerGateway());
+		final LogicalSlot slot = createTestingLogicalSlot(slotOwner);
 
 		final LogicalSlot otherSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
@@ -132,6 +128,12 @@ public class ExecutionTest extends TestLogger {
 		assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
 	}
 
+	private TestingLogicalSlot createTestingLogicalSlot(SlotOwner slotOwner) {
+		return new TestingLogicalSlotBuilder()
+			.setSlotOwner(slotOwner)
+			.createTestingLogicalSlot();
+	}
+
 	/**
 	 * Tests that the slot is released in case of a execution cancellation when having
 	 * a slot assigned and being in state SCHEDULED.
@@ -143,11 +145,7 @@ public class ExecutionTest extends TestLogger {
 
 		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
-		final SimpleSlot slot = new SimpleSlot(
-			slotOwner,
-			new LocalTaskManagerLocation(),
-			0,
-			new SimpleAckingTaskManagerGateway());
+		final LogicalSlot slot = createTestingLogicalSlot(slotOwner);
 
 		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
 		slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
@@ -193,11 +191,7 @@ public class ExecutionTest extends TestLogger {
 
 		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
-		final SimpleSlot slot = new SimpleSlot(
-			slotOwner,
-			new LocalTaskManagerLocation(),
-			0,
-			new SimpleAckingTaskManagerGateway());
+		final LogicalSlot slot = createTestingLogicalSlot(slotOwner);
 
 		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
 		slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
@@ -563,13 +557,7 @@ public class ExecutionTest extends TestLogger {
 
 		for (JobVertexID jobVertexId : jobVertexIds) {
 			for (int i = 0; i < parallelism; i++) {
-				final SimpleSlot slot = new SimpleSlot(
-					slotOwner,
-					new LocalTaskManagerLocation(),
-					0,
-					new SimpleAckingTaskManagerGateway(),
-					null,
-					null);
+				final LogicalSlot slot = createTestingLogicalSlot(slotOwner);
 
 				slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
 			}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 3e95587..37e5183 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -37,9 +36,13 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -237,15 +240,22 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 		//  - mocking the scheduler created fragile tests that break whenever the scheduler is adjusted
 		//  - exposing test methods in the ExecutionVertex leads to undesirable setters 
 
-		SlotContext slot = new SimpleSlotContext(
+		SlotContext slotContext = new SimpleSlotContext(
 			new AllocationID(),
 			location,
 			0,
 			mock(TaskManagerGateway.class));
 
-		SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0);
 
-		if (!vertex.getCurrentExecutionAttempt().tryAssignResource(simpleSlot)) {
+
+		LogicalSlot slot = new SingleLogicalSlot(
+			new SlotRequestId(),
+			slotContext,
+			null,
+			Locality.LOCAL,
+			mock(SlotOwner.class));
+
+		if (!vertex.getCurrentExecutionAttempt().tryAssignResource(slot)) {
 			throw new FlinkException("Could not assign resource.");
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
deleted file mode 100644
index dcf5c98..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
+++ /dev/null
@@ -1,630 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.AbstractID;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the allocation, properties, and release of shared slots.
- */
-public class SharedSlotsTest extends TestLogger {
-
-	private static final Iterable<TaskManagerLocation> NO_LOCATION = Collections.emptySet();
-
-	@Test
-	public void allocateAndReleaseEmptySlot() {
-		try {
-			JobVertexID vertexId = new JobVertexID();
-			
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(vertexId);
-			SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();
-			
-			assertEquals(0, assignment.getNumberOfSlots());
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vertexId));
-
-			// get a shared slot
-			SharedSlot slot = getSharedSlot(assignment);
-			
-			// check that the new slot is fresh
-			assertTrue(slot.isAlive());
-			assertFalse(slot.isCanceled());
-			assertFalse(slot.isReleased());
-			assertEquals(0, slot.getNumberLeaves());
-			assertFalse(slot.hasChildren());
-			assertTrue(slot.isRootAndEmpty());
-			assertNotNull(slot.toString());
-			assertTrue(slot.getSubSlots().isEmpty());
-			assertEquals(0, slot.getSlotNumber());
-			assertEquals(0, slot.getRootSlotNumber());
-			
-			// release the slot immediately.
-			slot.releaseSlot(null);
-
-			assertTrue(slot.isCanceled());
-			assertTrue(slot.isReleased());
-
-			assertEquals(0, assignment.getNumberOfSlots());
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vertexId));
-			
-			// we should not be able to allocate any children from this released slot
-			assertNull(slot.allocateSharedSlot(new AbstractID()));
-			assertNull(slot.allocateSubSlot(new AbstractID()));
-			
-			// we cannot add this slot to the assignment group
-			assertNull(assignment.addSharedSlotAndAllocateSubSlot(slot, Locality.NON_LOCAL, vertexId));
-			assertEquals(0, assignment.getNumberOfSlots());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void allocateSimpleSlotsAndReleaseFromRoot() {
-		try {
-			JobVertexID vid1 = new JobVertexID();
-			JobVertexID vid2 = new JobVertexID();
-			JobVertexID vid3 = new JobVertexID();
-			JobVertexID vid4 = new JobVertexID();
-
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(vid1, vid2, vid3, vid4);
-			SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();
-
-			// get a shared slot
-			SharedSlot sharedSlot = getSharedSlot(assignment);
-
-			// allocate a series of sub slots
-
-			SimpleSlot sub1 = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, vid1);
-			assertNotNull(sub1);
-
-			assertNull(sub1.getPayload());
-			assertEquals(Locality.LOCAL, sub1.getLocality());
-			assertEquals(1, sub1.getNumberLeaves());
-			assertEquals(vid1, sub1.getGroupID());
-			assertEquals(sharedSlot, sub1.getParent());
-			assertEquals(sharedSlot, sub1.getRoot());
-			assertEquals(0, sub1.getRootSlotNumber());
-			assertEquals(0, sub1.getSlotNumber());
-
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1));
-			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid2));
-			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid3));
-			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid4));
-			
-			SimpleSlot sub2 = assignment.getSlotForTask(vid2, NO_LOCATION);
-			assertNotNull(sub2);
-			
-			assertNull(sub2.getPayload());
-			assertEquals(Locality.UNCONSTRAINED, sub2.getLocality());
-			assertEquals(1, sub2.getNumberLeaves());
-			assertEquals(vid2, sub2.getGroupID());
-			assertEquals(sharedSlot, sub2.getParent());
-			assertEquals(sharedSlot, sub2.getRoot());
-			assertEquals(0, sub2.getRootSlotNumber());
-			assertEquals(1, sub2.getSlotNumber());
-
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid2));
-			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid3));
-			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid4));
-			
-			SimpleSlot sub3 = assignment.getSlotForTask(vid3, Collections.singleton(sub2.getTaskManagerLocation()));
-			assertNotNull(sub3);
-			
-			assertNull(sub3.getPayload());
-			assertEquals(Locality.LOCAL, sub3.getLocality());
-			assertEquals(1, sub3.getNumberLeaves());
-			assertEquals(vid3, sub3.getGroupID());
-			assertEquals(sharedSlot, sub3.getParent());
-			assertEquals(sharedSlot, sub3.getRoot());
-			assertEquals(0, sub3.getRootSlotNumber());
-			assertEquals(2, sub3.getSlotNumber());
-
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid2));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3));
-			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid4));
-
-			LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
-			SimpleSlot sub4 = assignment.getSlotForTask(vid4,
-					Collections.singleton(taskManagerLocation));
-			assertNotNull(sub4);
-			
-			assertNull(sub4.getPayload());
-			assertEquals(Locality.NON_LOCAL, sub4.getLocality());
-			assertEquals(1, sub4.getNumberLeaves());
-			assertEquals(vid4, sub4.getGroupID());
-			assertEquals(sharedSlot, sub4.getParent());
-			assertEquals(sharedSlot, sub4.getRoot());
-			assertEquals(0, sub4.getRootSlotNumber());
-			assertEquals(3, sub4.getSlotNumber());
-
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid2));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid4));
-			
-			// release from the root.
-			sharedSlot.releaseSlot(null);
-
-			assertTrue(sharedSlot.isReleased());
-			assertTrue(sub1.isReleased());
-			assertTrue(sub2.isReleased());
-			assertTrue(sub3.isReleased());
-			assertTrue(sub4.isReleased());
-			
-			assertEquals(0, sharedSlot.getNumberLeaves());
-			assertFalse(sharedSlot.hasChildren());
-
-			assertEquals(0, assignment.getNumberOfSlots());
-
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid2));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid4));
-
-			assertNull(sharedSlot.allocateSharedSlot(new AbstractID()));
-			assertNull(sharedSlot.allocateSubSlot(new AbstractID()));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void allocateSimpleSlotsAndReleaseFromLeaves() {
-		try {
-			JobVertexID vid1 = new JobVertexID();
-			JobVertexID vid2 = new JobVertexID();
-			JobVertexID vid3 = new JobVertexID();
-
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(vid1, vid2, vid3);
-			SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();
-
-			// allocate a shared slot
-			SharedSlot sharedSlot = getSharedSlot(assignment);
-
-			// allocate a series of sub slots
-
-			SimpleSlot sub1 = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid1);
-			SimpleSlot sub2 = assignment.getSlotForTask(vid2, NO_LOCATION);
-			SimpleSlot sub3 = assignment.getSlotForTask(vid3, NO_LOCATION);
-			
-			assertNotNull(sub1);
-			assertNotNull(sub2);
-			assertNotNull(sub3);
-
-			assertEquals(3, sharedSlot.getNumberLeaves());
-
-			assertEquals(1, assignment.getNumberOfSlots());
-			
-			// release from the leaves.
-			
-			sub2.releaseSlot(null);
-
-			assertTrue(sharedSlot.isAlive());
-			assertTrue(sub1.isAlive());
-			assertTrue(sub2.isReleased());
-			assertTrue(sub3.isAlive());
-			
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1));
-			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid2));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3));
-			assertEquals(1, assignment.getNumberOfSlots());
-			
-			assertEquals(2, sharedSlot.getNumberLeaves());
-
-			
-			sub1.releaseSlot(null);
-
-			assertTrue(sharedSlot.isAlive());
-			assertTrue(sub1.isReleased());
-			assertTrue(sub2.isReleased());
-			assertTrue(sub3.isAlive());
-			
-			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid1));
-			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid2));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3));
-			assertEquals(1, assignment.getNumberOfSlots());
-			
-			assertEquals(1, sharedSlot.getNumberLeaves());
-
-			sub3.releaseSlot(null);
-
-			assertTrue(sharedSlot.isReleased());
-			assertTrue(sub1.isReleased());
-			assertTrue(sub2.isReleased());
-			assertTrue(sub3.isReleased());
-
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid2));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3));
-			assertEquals(0, assignment.getNumberOfSlots());
-
-			assertEquals(0, assignment.getNumberOfSlots());
-
-			assertNull(sharedSlot.allocateSharedSlot(new AbstractID()));
-			assertNull(sharedSlot.allocateSubSlot(new AbstractID()));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void allocateAndReleaseInMixedOrder() {
-		try {
-			JobVertexID vid1 = new JobVertexID();
-			JobVertexID vid2 = new JobVertexID();
-			JobVertexID vid3 = new JobVertexID();
-
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(vid1, vid2, vid3);
-			SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();
-
-			// get a shared slot
-			SharedSlot sharedSlot = getSharedSlot(assignment);
-
-			// allocate a series of sub slots
-
-			SimpleSlot sub1 = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid1);
-			SimpleSlot sub2 = assignment.getSlotForTask(vid2, NO_LOCATION);
-
-			assertNotNull(sub1);
-			assertNotNull(sub2);
-
-			assertEquals(2, sharedSlot.getNumberLeaves());
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid2));
-			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid3));
-			assertEquals(1, assignment.getNumberOfSlots());
-			
-			
-			sub2.releaseSlot(null);
-
-			assertEquals(1, sharedSlot.getNumberLeaves());
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1));
-			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid2));
-			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid3));
-			assertEquals(1, assignment.getNumberOfSlots());
-			
-			
-			SimpleSlot sub3 = assignment.getSlotForTask(vid3, NO_LOCATION);
-			assertNotNull(sub3);
-			
-			assertEquals(2, sharedSlot.getNumberLeaves());
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1));
-			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid2));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3));
-			assertEquals(1, assignment.getNumberOfSlots());
-			
-			sub3.releaseSlot(null);
-			sub1.releaseSlot(null);
-
-			assertTrue(sharedSlot.isReleased());
-			assertEquals(0, sharedSlot.getNumberLeaves());
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid2));
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3));
-			assertEquals(0, assignment.getNumberOfSlots());
-
-			assertEquals(0, assignment.getNumberOfSlots());
-
-			assertNull(sharedSlot.allocateSharedSlot(new AbstractID()));
-			assertNull(sharedSlot.allocateSubSlot(new AbstractID()));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	/**
-	 * We allocate and release the structure below, starting by allocating a simple slot in the
-	 * shared slot and finishing by releasing a simple slot.
-	 * 
-	 * <pre>
-	 *     Shared(0)(root)
-	 *        |
-	 *        +-- Simple(2)(sink)
-	 *        |
-	 *        +-- Shared(1)(co-location-group)
-	 *        |      |
-	 *        |      +-- Simple(0)(tail)
-	 *        |      +-- Simple(1)(head)
-	 *        |
-	 *        +-- Simple(0)(source)
-	 * </pre>
-	 */
-	@Test
-	public void testAllocateAndReleaseTwoLevels() {
-		try {
-			JobVertexID sourceId = new JobVertexID();
-			JobVertexID headId = new JobVertexID();
-			JobVertexID tailId = new JobVertexID();
-			JobVertexID sinkId = new JobVertexID();
-
-			JobVertex headVertex = new JobVertex("head", headId);
-			JobVertex tailVertex = new JobVertex("tail", tailId);
-			
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(sourceId, headId, tailId, sinkId);
-			SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();
-			assertEquals(0, assignment.getNumberOfSlots());
-			
-			CoLocationGroup coLocationGroup = new CoLocationGroup(headVertex, tailVertex);
-			CoLocationConstraint constraint = coLocationGroup.getLocationConstraint(0);
-			assertFalse(constraint.isAssigned());
-
-			// allocate a shared slot
-			SharedSlot sharedSlot = getSharedSlot(assignment);
-			
-			// get the first simple slot
-			SimpleSlot sourceSlot = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, sourceId);
-			
-			assertEquals(1, sharedSlot.getNumberLeaves());
-			
-			// get the first slot in the nested shared slot from the co-location constraint
-			SimpleSlot headSlot = assignment.getSlotForTask(constraint, Collections.<TaskManagerLocation>emptySet());
-			assertEquals(2, sharedSlot.getNumberLeaves());
-
-			assertNotNull(constraint.getSharedSlot());
-			assertTrue(constraint.getSharedSlot().isAlive());
-			assertFalse(constraint.isAssigned());
-			
-			// we do not immediately lock the location
-			headSlot.releaseSlot();
-			assertEquals(1, sharedSlot.getNumberLeaves());
-
-			assertNotNull(constraint.getSharedSlot());
-			assertTrue(constraint.getSharedSlot().isReleased());
-			assertFalse(constraint.isAssigned());
-			
-			// re-allocate the head slot
-			headSlot = assignment.getSlotForTask(constraint, Collections.<TaskManagerLocation>emptySet());
-			
-			constraint.lockLocation();
-			assertNotNull(constraint.getSharedSlot());
-			assertTrue(constraint.isAssigned());
-			assertTrue(constraint.isAssignedAndAlive());
-			
-			SimpleSlot tailSlot = assignment.getSlotForTask(constraint, Collections.<TaskManagerLocation>emptySet());
-			
-			assertEquals(constraint.getSharedSlot(), headSlot.getParent());
-			assertEquals(constraint.getSharedSlot(), tailSlot.getParent());
-			
-			SimpleSlot sinkSlot = assignment.getSlotForTask(sinkId, Collections.<TaskManagerLocation>emptySet());
-			assertEquals(4, sharedSlot.getNumberLeaves());
-			
-			// we release our co-location constraint tasks
-			headSlot.releaseSlot(null);
-			tailSlot.releaseSlot(null);
-
-			assertEquals(2, sharedSlot.getNumberLeaves());
-			assertTrue(headSlot.isReleased());
-			assertTrue(tailSlot.isReleased());
-			assertTrue(constraint.isAssigned());
-			assertFalse(constraint.isAssignedAndAlive());
-			
-			// we should have resources again for the co-location constraint
-			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(constraint.getGroupId()));
-			
-			// re-allocate head and tail from the constraint
-			headSlot = assignment.getSlotForTask(constraint, NO_LOCATION);
-			tailSlot = assignment.getSlotForTask(constraint, NO_LOCATION);
-			
-			assertEquals(4, sharedSlot.getNumberLeaves());
-			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(constraint.getGroupId()));
-			
-			// verify some basic properties of the slots
-			assertEquals(sourceSlot.getTaskManagerID(), headSlot.getTaskManagerID());
-			assertEquals(sourceSlot.getTaskManagerID(), tailSlot.getTaskManagerID());
-			assertEquals(sourceSlot.getTaskManagerID(), sinkSlot.getTaskManagerID());
-
-			assertEquals(sourceId, sourceSlot.getGroupID());
-			assertEquals(sinkId, sinkSlot.getGroupID());
-			assertNull(headSlot.getGroupID());
-			assertNull(tailSlot.getGroupID());
-			assertEquals(constraint.getGroupId(), constraint.getSharedSlot().getGroupID());
-			
-			// release all
-			sourceSlot.releaseSlot(null);
-			headSlot.releaseSlot(null);
-			tailSlot.releaseSlot(null);
-			sinkSlot.releaseSlot(null);
-			
-			assertTrue(sharedSlot.isReleased());
-			assertTrue(sourceSlot.isReleased());
-			assertTrue(headSlot.isReleased());
-			assertTrue(tailSlot.isReleased());
-			assertTrue(sinkSlot.isReleased());
-			assertTrue(constraint.getSharedSlot().isReleased());
-			
-			assertTrue(constraint.isAssigned());
-			assertFalse(constraint.isAssignedAndAlive());
-
-			assertEquals(0, assignment.getNumberOfSlots());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * We allocate and the structure below and release it from the root.
-	 *
-	 * <pre>
-	 *     Shared(0)(root)
-	 *        |
-	 *        +-- Simple(2)(sink)
-	 *        |
-	 *        +-- Shared(1)(co-location-group)
-	 *        |      |
-	 *        |      +-- Simple(0)(tail)
-	 *        |      +-- Simple(1)(head)
-	 *        |
-	 *        +-- Simple(0)(source)
-	 * </pre>
-	 */
-	@Test
-	public void testReleaseTwoLevelsFromRoot() {
-		try {
-			JobVertexID sourceId = new JobVertexID();
-			JobVertexID headId = new JobVertexID();
-			JobVertexID tailId = new JobVertexID();
-			JobVertexID sinkId = new JobVertexID();
-
-			JobVertex headVertex = new JobVertex("head", headId);
-			JobVertex tailVertex = new JobVertex("tail", tailId);
-
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(sourceId, headId, tailId, sinkId);
-			SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();
-			assertEquals(0, assignment.getNumberOfSlots());
-
-			CoLocationGroup coLocationGroup = new CoLocationGroup(headVertex, tailVertex);
-			CoLocationConstraint constraint = coLocationGroup.getLocationConstraint(0);
-			assertFalse(constraint.isAssigned());
-
-			// allocate a shared slot
-			SharedSlot sharedSlot = getSharedSlot(assignment);
-
-			// get the first simple slot
-			SimpleSlot sourceSlot = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, sourceId);
-			
-			SimpleSlot headSlot = assignment.getSlotForTask(constraint, NO_LOCATION);
-			constraint.lockLocation();
-			SimpleSlot tailSlot = assignment.getSlotForTask(constraint, NO_LOCATION);
-			
-			SimpleSlot sinkSlot = assignment.getSlotForTask(sinkId, NO_LOCATION);
-			
-			assertEquals(4, sharedSlot.getNumberLeaves());
-
-			// release all
-			sourceSlot.releaseSlot(null);
-			headSlot.releaseSlot(null);
-			tailSlot.releaseSlot(null);
-			sinkSlot.releaseSlot(null);
-
-			assertTrue(sharedSlot.isReleased());
-			assertTrue(sourceSlot.isReleased());
-			assertTrue(headSlot.isReleased());
-			assertTrue(tailSlot.isReleased());
-			assertTrue(sinkSlot.isReleased());
-			assertTrue(constraint.getSharedSlot().isReleased());
-
-			assertTrue(constraint.isAssigned());
-			assertFalse(constraint.isAssignedAndAlive());
-			
-			assertEquals(0, assignment.getNumberOfSlots());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testImmediateReleaseOneLevel() {
-		try {
-			JobVertexID vid = new JobVertexID();
-
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
-			SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();
-
-			SharedSlot sharedSlot = getSharedSlot(assignment);
-
-			SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid);
-			sub.releaseSlot(null);
-			
-			assertTrue(sub.isReleased());
-			assertTrue(sharedSlot.isReleased());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testImmediateReleaseTwoLevel() {
-		try {
-			JobVertexID vid = new JobVertexID();
-			JobVertex vertex = new JobVertex("vertex", vid);
-			
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
-			SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();
-
-			CoLocationGroup coLocationGroup = new CoLocationGroup(vertex);
-			CoLocationConstraint constraint = coLocationGroup.getLocationConstraint(0);
-
-			SharedSlot sharedSlot = getSharedSlot(assignment);
-
-			SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, constraint);
-			
-			assertNull(sub.getGroupID());
-			assertEquals(constraint.getSharedSlot(), sub.getParent());
-			
-			sub.releaseSlot(null);
-
-			assertTrue(sub.isReleased());
-			assertTrue(sharedSlot.isReleased());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	public SharedSlot getSharedSlot(SlotSharingGroupAssignment assignment) {
-		final TestingSlotOwner slotOwner = new TestingSlotOwner();
-		slotOwner.setReturnAllocatedSlotConsumer((LogicalSlot logicalSlot) -> ((SharedSlot) logicalSlot).markReleased());
-		return new SharedSlot(
-			slotOwner,
-			new LocalTaskManagerLocation(),
-			0,
-			new SimpleAckingTaskManagerGateway(),
-			assignment);
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
deleted file mode 100644
index affe5cb..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.TestingPayload;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class SimpleSlotTest extends  TestLogger {
-
-	@Test
-	public void testStateTransitions() {
-		try {
-			// release immediately
-			{
-				SimpleSlot slot = getSlot();
-				assertTrue(slot.isAlive());
-
-				slot.releaseSlot(null);
-				assertFalse(slot.isAlive());
-				assertTrue(slot.isCanceled());
-				assertTrue(slot.isReleased());
-			}
-
-			// state transitions manually
-			{
-				SimpleSlot slot = getSlot();
-				assertTrue(slot.isAlive());
-
-				slot.markCancelled();
-				assertFalse(slot.isAlive());
-				assertTrue(slot.isCanceled());
-				assertFalse(slot.isReleased());
-
-				slot.markReleased();
-				assertFalse(slot.isAlive());
-				assertTrue(slot.isCanceled());
-				assertTrue(slot.isReleased());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testSetExecutionVertex() {
-		try {
-			TestingPayload payload1 = new TestingPayload();
-			TestingPayload payload2 = new TestingPayload();
-
-			// assign to alive slot
-			{
-				SimpleSlot slot = getSlot();
-
-				assertTrue(slot.tryAssignPayload(payload1));
-				assertEquals(payload1, slot.getPayload());
-
-				// try to add another one
-				assertFalse(slot.tryAssignPayload(payload2));
-				assertEquals(payload1, slot.getPayload());
-			}
-
-			// assign to canceled slot
-			{
-				SimpleSlot slot = getSlot();
-				assertTrue(slot.markCancelled());
-
-				assertFalse(slot.tryAssignPayload(payload1));
-				assertNull(slot.getPayload());
-			}
-
-			// assign to released marked slot
-			{
-				SimpleSlot slot = getSlot();
-				assertTrue(slot.markCancelled());
-				assertTrue(slot.markReleased());
-
-				assertFalse(slot.tryAssignPayload(payload1));
-				assertNull(slot.getPayload());
-			}
-			
-			// assign to released
-			{
-				SimpleSlot slot = getSlot();
-				slot.releaseSlot(null);
-
-				assertFalse(slot.tryAssignPayload(payload1));
-				assertNull(slot.getPayload());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	public static SimpleSlot getSlot() {
-		final TestingSlotOwner slotOwner = new TestingSlotOwner();
-		slotOwner.setReturnAllocatedSlotConsumer((LogicalSlot logicalSlot) -> ((SimpleSlot) logicalSlot).markReleased());
-		return new SimpleSlot(
-			slotOwner,
-			new LocalTaskManagerLocation(),
-			0,
-			new SimpleAckingTaskManagerGateway());
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index eccccf7..cad0af5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -365,7 +365,6 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase {
 		s2.releaseSlot();
 
 		assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots());
-		assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
 
 		LogicalSlot s3 = testingSlotProvider.allocateSlot(
 				new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get();
@@ -412,7 +411,6 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase {
 		s2.releaseSlot();
 
 		assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots());
-		assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
 
 		LogicalSlot sa = testingSlotProvider.allocateSlot(
 				new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
@@ -487,10 +485,6 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase {
 		s3.releaseSlot();
 		s4.releaseSlot();
 		assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots());
-
-		assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
-		assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
-		assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
 	}
 
 	@Test
@@ -529,10 +523,6 @@ public class ScheduleWithCoLocationHintTest extends SchedulerTestBase {
 		s4.releaseSlot();
 
 		assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots());
-
-		assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
-		assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
-		assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
 	}
 
 	private static SlotProfile slotProfileForLocation(TaskManagerLocation location) {