You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/31 11:06:22 UTC

[flink] branch master updated: [FLINK-10569][tests] Replace various Scheduler usages

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 568d90c  [FLINK-10569][tests] Replace various Scheduler usages
568d90c is described below

commit 568d90cf0b718fb659a5fd09319df9b212a2ec2d
Author: ZILI CHEN <wa...@gmail.com>
AuthorDate: Thu Jan 31 19:06:15 2019 +0800

    [FLINK-10569][tests] Replace various Scheduler usages
---
 .../ExecutionGraphConstructionTest.java            |   20 +-
 .../ExecutionGraphDeploymentTest.java              |   34 +-
 .../executiongraph/ExecutionGraphMetricsTest.java  |   16 +-
 .../ExecutionGraphRescalingTest.java               |   10 +-
 .../executiongraph/ExecutionGraphTestUtils.java    |    5 +-
 .../executiongraph/ExecutionVertexCancelTest.java  |   17 +-
 .../ExecutionVertexSchedulingTest.java             |   34 +-
 .../executiongraph/PointwisePatternTest.java       |   96 +-
 .../RestartPipelinedRegionStrategyTest.java        |   14 +-
 .../executiongraph/VertexSlotSharingTest.java      |    4 +-
 .../scheduler/SchedulerIsolatedTasksTest.java      |    2 +-
 .../scheduler/SchedulerSlotSharingTest.java        | 1052 --------------------
 .../jobmanager/scheduler/SchedulerTest.java        |  135 ---
 .../jobmanager/scheduler/SchedulerTestBase.java    |    2 +-
 .../partitioner/RescalePartitionerTest.java        |    5 +-
 15 files changed, 95 insertions(+), 1351 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index 00d1ac0..c3604f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -44,6 +43,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
@@ -119,7 +119,7 @@ public class ExecutionGraphConstructionTest {
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
-			new Scheduler(TestingUtils.defaultExecutionContext()));
+			new TestingSlotProvider(ignored -> new CompletableFuture<>()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -167,10 +167,10 @@ public class ExecutionGraphConstructionTest {
 			jobId, 
 			jobName, 
 			cfg,
-				new SerializedValue<>(new ExecutionConfig()),
+			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
-			new Scheduler(TestingUtils.defaultExecutionContext()));
+			new TestingSlotProvider(ignored -> new CompletableFuture<>()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -246,7 +246,7 @@ public class ExecutionGraphConstructionTest {
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
-			new Scheduler(TestingUtils.defaultExecutionContext()));
+			new TestingSlotProvider(ignored -> new CompletableFuture<>()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -317,7 +317,7 @@ public class ExecutionGraphConstructionTest {
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
-			new Scheduler(TestingUtils.defaultExecutionContext()));
+			new TestingSlotProvider(ignored -> new CompletableFuture<>()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -383,7 +383,7 @@ public class ExecutionGraphConstructionTest {
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
-			new Scheduler(TestingUtils.defaultExecutionContext()));
+			new TestingSlotProvider(ignored -> new CompletableFuture<>()));
 		try {
 			eg.attachJobGraph(ordered);
 			fail("Attached wrong jobgraph");
@@ -453,7 +453,7 @@ public class ExecutionGraphConstructionTest {
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
 				new NoRestartStrategy(),
-				new Scheduler(TestingUtils.defaultExecutionContext()));
+				new TestingSlotProvider(ignored -> new CompletableFuture<>()));
 			try {
 				eg.attachJobGraph(ordered);
 			}
@@ -501,7 +501,7 @@ public class ExecutionGraphConstructionTest {
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
 				new NoRestartStrategy(),
-				new Scheduler(TestingUtils.defaultExecutionContext()));
+				new TestingSlotProvider(ignored -> new CompletableFuture<>()));
 
 			try {
 				eg.attachJobGraph(ordered);
@@ -584,7 +584,7 @@ public class ExecutionGraphConstructionTest {
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
 				new NoRestartStrategy(),
-				new Scheduler(TestingUtils.defaultExecutionContext()));
+				new TestingSlotProvider(ignored -> new CompletableFuture<>()));
 			
 			eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
 			
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 12b4277..3f3142c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -52,9 +52,10 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -67,6 +68,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -164,7 +166,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 				AkkaUtils.getDefaultTimeout(),
 				new NoRestartStrategy(),
 				new RestartAllStrategy.Factory(),
-				new Scheduler(TestingUtils.defaultExecutionContext()),
+				new TestingSlotProvider(ignore -> new CompletableFuture<>()),
 				ExecutionGraph.class.getClassLoader(),
 				blobWriter,
 				AkkaUtils.getDefaultTimeout());
@@ -426,15 +428,13 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
 
-		Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
+		final ArrayDeque<CompletableFuture<LogicalSlot>> slotFutures = new ArrayDeque<>();
 		for (int i = 0; i < dop1; i++) {
-			scheduler.newInstanceAvailable(
-				ExecutionGraphTestUtils.getInstance(
-					new ActorTaskManagerGateway(
-						new ExecutionGraphTestUtils.SimpleActorGateway(
-							TestingUtils.directExecutionContext()))));
+			slotFutures.addLast(CompletableFuture.completedFuture(new TestingLogicalSlot()));
 		}
 
+		final SlotProvider slotProvider = new TestingSlotProvider(ignore -> slotFutures.removeFirst());
+
 		final JobInformation jobInformation = new DummyJobInformation(
 			jobId,
 			"failing test job");
@@ -447,7 +447,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
 			new RestartAllStrategy.Factory(),
-			scheduler,
+			slotProvider,
 			ExecutionGraph.class.getClassLoader(),
 			blobWriter,
 			AkkaUtils.getDefaultTimeout());
@@ -459,8 +459,6 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 		List<JobVertex> ordered = Arrays.asList(v1, v2);
 		eg.attachJobGraph(ordered);
 
-		assertEquals(dop1, scheduler.getNumberOfAvailableSlots());
-
 		// schedule, this triggers mock deployment
 		eg.scheduleForExecution();
 
@@ -508,15 +506,13 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 		v1.setInvokableClass(BatchTask.class);
 		v2.setInvokableClass(BatchTask.class);
 
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		final ArrayDeque<CompletableFuture<LogicalSlot>> slotFutures = new ArrayDeque<>();
 		for (int i = 0; i < dop1 + dop2; i++) {
-			scheduler.newInstanceAvailable(
-					ExecutionGraphTestUtils.getInstance(
-							new ActorTaskManagerGateway(
-									new ExecutionGraphTestUtils.SimpleActorGateway(
-											TestingUtils.directExecutionContext()))));
+			slotFutures.addLast(CompletableFuture.completedFuture(new TestingLogicalSlot()));
 		}
 
+		final SlotProvider slotProvider = new TestingSlotProvider(ignore -> slotFutures.removeFirst());
+
 		final JobInformation jobInformation = new DummyJobInformation(
 			jobId,
 			"some job");
@@ -529,7 +525,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
 			new RestartAllStrategy.Factory(),
-			scheduler,
+			slotProvider,
 			ExecutionGraph.class.getClassLoader(),
 			blobWriter,
 			AkkaUtils.getDefaultTimeout());
@@ -540,8 +536,6 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 		List<JobVertex> ordered = Arrays.asList(v1, v2);
 		eg.attachJobGraph(ordered);
 
-		assertEquals(dop1 + dop2, scheduler.getNumberOfAvailableSlots());
-
 		// schedule, this triggers mock deployment
 		eg.scheduleForExecution();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index ec0d2e3..9a410fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -31,10 +30,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -44,6 +40,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -52,10 +49,6 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyBoolean;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class ExecutionGraphMetricsTest extends TestLogger {
 
@@ -76,11 +69,12 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
 			Configuration jobConfig = new Configuration();
 			Time timeout = Time.seconds(10L);
-			Scheduler scheduler = mock(Scheduler.class);
 
 			CompletableFuture<LogicalSlot> slotFuture1 = CompletableFuture.completedFuture(new TestingLogicalSlot());
 			CompletableFuture<LogicalSlot> slotFuture2 = CompletableFuture.completedFuture(new TestingLogicalSlot());
-			when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), any(Time.class))).thenReturn(slotFuture1, slotFuture2);
+			ArrayDeque<CompletableFuture<LogicalSlot>> slotFutures = new ArrayDeque<>();
+			slotFutures.addLast(slotFuture1);
+			slotFutures.addLast(slotFuture2);
 
 			TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
 
@@ -93,7 +87,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 				new SerializedValue<>(null),
 				timeout,
 				testingRestartStrategy,
-				scheduler);
+				new TestingSlotProvider(ignore -> slotFutures.removeFirst()));
 
 			RestartTimeGauge restartingTime = new RestartTimeGauge(executionGraph);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
index 69764e7..4b22f69 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -40,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
@@ -69,7 +69,7 @@ public class ExecutionGraphRescalingTest extends TestLogger {
 			config,
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
-			new Scheduler(TestingUtils.defaultExecutionContext()),
+			new TestingSlotProvider(ignore -> new CompletableFuture<>()),
 			Thread.currentThread().getContextClassLoader(),
 			new StandaloneCheckpointRecoveryFactory(),
 			AkkaUtils.getDefaultTimeout(),
@@ -98,7 +98,7 @@ public class ExecutionGraphRescalingTest extends TestLogger {
 			config,
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
-			new Scheduler(TestingUtils.defaultExecutionContext()),
+			new TestingSlotProvider(ignore -> new CompletableFuture<>()),
 			Thread.currentThread().getContextClassLoader(),
 			new StandaloneCheckpointRecoveryFactory(),
 			AkkaUtils.getDefaultTimeout(),
@@ -127,7 +127,7 @@ public class ExecutionGraphRescalingTest extends TestLogger {
 			config,
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
-			new Scheduler(TestingUtils.defaultExecutionContext()),
+			new TestingSlotProvider(ignore -> new CompletableFuture<>()),
 			Thread.currentThread().getContextClassLoader(),
 			new StandaloneCheckpointRecoveryFactory(),
 			AkkaUtils.getDefaultTimeout(),
@@ -169,7 +169,7 @@ public class ExecutionGraphRescalingTest extends TestLogger {
 				config,
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
-				new Scheduler(TestingUtils.defaultExecutionContext()),
+				new TestingSlotProvider(ignore -> new CompletableFuture<>()),
 				Thread.currentThread().getContextClassLoader(),
 				new StandaloneCheckpointRecoveryFactory(),
 				AkkaUtils.getDefaultTimeout(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 0d603fc..e8bd30d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -48,7 +48,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
@@ -71,12 +70,12 @@ import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.time.Duration;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
 
 import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -583,7 +582,7 @@ public class ExecutionGraphTestUtils {
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
-			new Scheduler(ExecutionContext$.MODULE$.fromExecutor(executor)));
+			new TestingSlotProvider(ignored -> new CompletableFuture<>()));
 
 		return spy(new ExecutionJobVertex(graph, ajv, 1, AkkaUtils.getDefaultTimeout()));
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 4189476..d64f15f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
@@ -41,6 +40,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 
 import scala.concurrent.ExecutionContext;
 
@@ -53,7 +53,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
 
 @SuppressWarnings("serial")
 public class ExecutionVertexCancelTest extends TestLogger {
@@ -454,8 +453,11 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			// scheduling after being canceled should be tolerated (no exception) because
 			// it can occur as the result of races
 			{
-				Scheduler scheduler = mock(Scheduler.class);
-				vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, Collections.emptySet());
+				vertex.scheduleForExecution(
+					new TestingSlotProvider(ignore -> new CompletableFuture<>()),
+					false,
+					LocationPreferenceConstraint.ALL,
+					Collections.emptySet());
 
 				assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 			}
@@ -493,8 +495,11 @@ public class ExecutionVertexCancelTest extends TestLogger {
 						AkkaUtils.getDefaultTimeout());
 				setVertexState(vertex, ExecutionState.CANCELING);
 
-				Scheduler scheduler = mock(Scheduler.class);
-				vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, Collections.emptySet());
+				vertex.scheduleForExecution(
+					new TestingSlotProvider(ignore -> new CompletableFuture<>()),
+					false,
+					LocationPreferenceConstraint.ALL,
+					Collections.emptySet());
 			}
 			catch (Exception e) {
 				fail("should not throw an exception");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 6bfcb7f..0bc53d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -18,20 +18,15 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import org.junit.Test;
@@ -44,10 +39,6 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.anyBoolean;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class ExecutionVertexSchedulingTest {
 
@@ -65,14 +56,16 @@ public class ExecutionVertexSchedulingTest {
 			slot.releaseSlot();
 			assertTrue(slot.isReleased());
 
-			Scheduler scheduler = mock(Scheduler.class);
 			CompletableFuture<LogicalSlot> future = new CompletableFuture<>();
 			future.complete(slot);
-			when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), any(Time.class))).thenReturn(future);
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			// try to deploy to the slot
-			vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, Collections.emptySet());
+			vertex.scheduleForExecution(
+				new TestingSlotProvider(ignore -> future),
+				false,
+				LocationPreferenceConstraint.ALL,
+				Collections.emptySet());
 
 			// will have failed
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
@@ -99,12 +92,13 @@ public class ExecutionVertexSchedulingTest {
 
 			final CompletableFuture<LogicalSlot> future = new CompletableFuture<>();
 
-			Scheduler scheduler = mock(Scheduler.class);
-			when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), any(Time.class))).thenReturn(future);
-
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			// try to deploy to the slot
-			vertex.scheduleForExecution(scheduler, true, LocationPreferenceConstraint.ALL, Collections.emptySet());
+			vertex.scheduleForExecution(
+				new TestingSlotProvider(ignore -> future),
+				true,
+				LocationPreferenceConstraint.ALL,
+				Collections.emptySet());
 
 			// future has not yet a slot
 			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
@@ -131,15 +125,17 @@ public class ExecutionVertexSchedulingTest {
 				new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext())));
 			final SimpleSlot slot = instance.allocateSimpleSlot();
 
-			Scheduler scheduler = mock(Scheduler.class);
 			CompletableFuture<LogicalSlot> future = new CompletableFuture<>();
 			future.complete(slot);
-			when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), any(Time.class))).thenReturn(future);
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
 			// try to deploy to the slot
-			vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, Collections.emptySet());
+			vertex.scheduleForExecution(
+				new TestingSlotProvider(ignore -> future),
+				false,
+				LocationPreferenceConstraint.ALL,
+				Collections.emptySet());
 			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 		}
 		catch (Exception e) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 8ff0032..0344326 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -36,6 +35,7 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
@@ -45,10 +45,6 @@ import org.apache.flink.api.common.JobID;
 
 
 public class PointwisePatternTest {
-
-	private final JobID jobId = new JobID();
-	private final String jobName = "Test Job Sample Name";
-	private final Configuration cfg = new Configuration();
 	
 	@Test
 	public void testNToN() throws Exception {
@@ -67,16 +63,7 @@ public class PointwisePatternTest {
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutor(), 
-			TestingUtils.defaultExecutor(),
-			jobId, 
-			jobName, 
-			cfg,
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy(),
-			new Scheduler(TestingUtils.defaultExecutionContext()));
+		ExecutionGraph eg = getDummyExecutionGraph();
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -114,16 +101,7 @@ public class PointwisePatternTest {
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutor(),
-			TestingUtils.defaultExecutor(),
-			jobId, 
-			jobName, 
-			cfg,
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy(),
-			new Scheduler(TestingUtils.defaultExecutionContext()));
+		ExecutionGraph eg = getDummyExecutionGraph();
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -162,16 +140,7 @@ public class PointwisePatternTest {
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutor(),
-			TestingUtils.defaultExecutor(),
-			jobId, 
-			jobName, 
-			cfg,
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy(),
-			new Scheduler(TestingUtils.defaultExecutionContext()));
+		ExecutionGraph eg = getDummyExecutionGraph();
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -211,16 +180,7 @@ public class PointwisePatternTest {
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutor(),
-			TestingUtils.defaultExecutor(),
-			jobId, 
-			jobName,
-			cfg,
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy(),
-			new Scheduler(TestingUtils.defaultExecutionContext()));
+		ExecutionGraph eg = getDummyExecutionGraph();
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -258,16 +218,7 @@ public class PointwisePatternTest {
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutor(),
-			TestingUtils.defaultExecutor(),
-			jobId, 
-			jobName, 
-			cfg,
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy(),
-			new Scheduler(TestingUtils.defaultExecutionContext()));
+		ExecutionGraph eg = getDummyExecutionGraph();
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -303,6 +254,19 @@ public class PointwisePatternTest {
 		testHighToLow(20, 15);
 		testHighToLow(31, 11);
 	}
+
+	private ExecutionGraph getDummyExecutionGraph() throws Exception {
+		return new ExecutionGraph(
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			new JobID(),
+			"Test Job Sample Name",
+			new Configuration(),
+			new SerializedValue<>(new ExecutionConfig()),
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy(),
+			new TestingSlotProvider(ignored -> new CompletableFuture<>()));
+	}
 	
 	private void testLowToHigh(int lowDop, int highDop) throws Exception {
 		if (highDop < lowDop) {
@@ -325,16 +289,7 @@ public class PointwisePatternTest {
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutor(),
-			TestingUtils.defaultExecutor(),
-			jobId, 
-			jobName, 
-			cfg,
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy(),
-			new Scheduler(TestingUtils.defaultExecutionContext()));
+		ExecutionGraph eg = getDummyExecutionGraph();
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -383,16 +338,7 @@ public class PointwisePatternTest {
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutor(),
-			TestingUtils.defaultExecutor(),
-			jobId, 
-			jobName, 
-			cfg,
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy(),
-			new Scheduler(TestingUtils.defaultExecutionContext()));
+		ExecutionGraph eg = getDummyExecutionGraph();
 		try {
 			eg.attachJobGraph(ordered);
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
index 9974574..7b00e4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import org.junit.Test;
@@ -37,6 +36,7 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -89,7 +89,6 @@ public class RestartPipelinedRegionStrategyTest {
 		
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
-        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
         final JobInformation jobInformation = new DummyJobInformation(
 			jobId,
 			jobName);
@@ -101,7 +100,7 @@ public class RestartPipelinedRegionStrategyTest {
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
             new RestartPipelinedRegionStrategy.Factory(),
-            scheduler,
+			new TestingSlotProvider(ignored -> new CompletableFuture<>()),
             ExecutionGraph.class.getClassLoader(),
 			VoidBlobWriter.getInstance(),
 			AkkaUtils.getDefaultTimeout());
@@ -174,7 +173,6 @@ public class RestartPipelinedRegionStrategyTest {
 
         List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
-        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
         final JobInformation jobInformation = new DummyJobInformation(
 			jobId,
 			jobName);
@@ -186,7 +184,7 @@ public class RestartPipelinedRegionStrategyTest {
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
             new RestartPipelinedRegionStrategy.Factory(),
-            scheduler,
+			new TestingSlotProvider(ignored -> new CompletableFuture<>()),
             ExecutionGraph.class.getClassLoader(),
 			VoidBlobWriter.getInstance(),
 			AkkaUtils.getDefaultTimeout());
@@ -264,7 +262,6 @@ public class RestartPipelinedRegionStrategyTest {
 
         List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
-        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
 		final JobInformation jobInformation = new DummyJobInformation(
 			jobId,
 			jobName);
@@ -276,7 +273,7 @@ public class RestartPipelinedRegionStrategyTest {
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
             new RestartPipelinedRegionStrategy.Factory(),
-            scheduler,
+			new TestingSlotProvider(ignored -> new CompletableFuture<>()),
             ExecutionGraph.class.getClassLoader(),
 			VoidBlobWriter.getInstance(),
 			AkkaUtils.getDefaultTimeout());
@@ -345,7 +342,6 @@ public class RestartPipelinedRegionStrategyTest {
 
         List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4));
 
-        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
 		final JobInformation jobInformation = new DummyJobInformation(
 			jobId,
 			jobName);
@@ -357,7 +353,7 @@ public class RestartPipelinedRegionStrategyTest {
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
             new RestartPipelinedRegionStrategy.Factory(),
-            scheduler,
+			new TestingSlotProvider(ignored -> new CompletableFuture<>()),
             ExecutionGraph.class.getClassLoader(),
 			VoidBlobWriter.getInstance(),
 			AkkaUtils.getDefaultTimeout());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 90e3368..a12710c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
@@ -33,7 +34,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
@@ -90,7 +90,7 @@ public class VertexSlotSharingTest {
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
 				new NoRestartStrategy(),
-				new Scheduler(TestingUtils.defaultExecutionContext()));
+				new TestingSlotProvider(ignored -> new CompletableFuture<>()));
 			eg.attachJobGraph(vertices);
 			
 			// verify that the vertices are all in the same slot sharing group
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 03da3b1..4fbb7f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -49,7 +49,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for the {@link Scheduler} when scheduling individual tasks.
+ * Tests for scheduling individual tasks.
  */
 public class SchedulerIsolatedTasksTest extends SchedulerTestBase {
 	
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
deleted file mode 100644
index 20e221d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ /dev/null
@@ -1,1052 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.Random;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.areAllDistinct;
-import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
-import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertex;
-import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertexWithLocation;
-import static org.apache.flink.runtime.testutils.CommonTestUtils.sleepUninterruptibly;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the scheduler when scheduling tasks in slot sharing groups.
- */
-public class SchedulerSlotSharingTest extends SchedulerTestBase {
-
-	@Test
-	public void scheduleSingleVertexType() {
-		try {
-			JobVertexID jid1 = new JobVertexID();
-			
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1);
-
-			final ResourceID tm1ResourceId = testingSlotProvider.addTaskManager(2).getResourceID();
-			testingSlotProvider.addTaskManager(2);
-			
-			// schedule 4 tasks from the first vertex group
-			LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			assertNotNull(s1);
-			assertNotNull(s2);
-			assertNotNull(s3);
-			assertNotNull(s4);
-			
-			assertTrue(areAllDistinct(s1, s2, s3, s4));
-			
-			// we cannot schedule another task from the first vertex group
-			try {
-				testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-				fail("Scheduler accepted too many tasks at the same time");
-			}
-			catch (ExecutionException e) {
-				assertTrue(e.getCause() instanceof NoResourceAvailableException);
-			}
-			catch (Exception e) {
-				fail("Wrong exception.");
-			}
-			
-			// release something
-			s3.releaseSlot();
-			
-			// allocate another slot from that group
-			LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			assertNotNull(s5);
-			
-			// release all old slots
-			s1.releaseSlot();
-			s2.releaseSlot();
-			s4.releaseSlot();
-			
-			LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s8 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			assertNotNull(s6);
-			assertNotNull(s7);
-			assertNotNull(s8);
-			
-			// make sure we have two slots on the first instance, and two on the second
-			int c = 0;
-			c += (s5.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
-			c += (s6.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
-			c += (s7.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
-			c += (s8.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
-			assertEquals(0, c);
-			
-			// release all
-			s5.releaseSlot();
-			s6.releaseSlot();
-			s7.releaseSlot();
-			s8.releaseSlot();
-			
-			// test that everything is released
-			assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
-
-			// check the scheduler's bookkeeping
-			assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
-			assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
-			assertEquals(8, testingSlotProvider.getNumberOfUnconstrainedAssignments());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void allocateSlotWithSharing() throws Exception {
-		JobVertexID jid1 = new JobVertexID();
-		JobVertexID jid2 = new JobVertexID();
-
-		SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
-		testingSlotProvider.addTaskManager(2);
-		testingSlotProvider.addTaskManager(2);
-
-		// schedule 4 tasks from the first vertex group
-		LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-		LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-		LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-		LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
-		assertNotNull(s1);
-		assertNotNull(s2);
-		assertNotNull(s3);
-		assertNotNull(s4);
-
-		assertTrue(areAllDistinct(s1, s2, s3, s4));
-
-		// we cannot schedule another task from the first vertex group
-		try {
-			testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			fail("Scheduler accepted too many tasks at the same time");
-		}
-		catch (ExecutionException e) {
-			assertTrue(e.getCause() instanceof NoResourceAvailableException);
-		}
-		catch (Exception e) {
-			fail("Wrong exception.");
-		}
-
-		// schedule some tasks from the second ID group
-		LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-		LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-		LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-		LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
-		assertNotNull(s1_2);
-		assertNotNull(s2_2);
-		assertNotNull(s3_2);
-		assertNotNull(s4_2);
-
-		// we cannot schedule another task from the second vertex group
-		try {
-			testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			fail("Scheduler accepted too many tasks at the same time");
-		}
-		catch (ExecutionException e) {
-			assertTrue(e.getCause() instanceof NoResourceAvailableException);
-		}
-		catch (Exception e) {
-			fail("Wrong exception.");
-		}
-
-		// now, we release some vertices (sub-slots) from the first group.
-		// that should allow us to schedule more vertices from the first group
-		s1.releaseSlot();
-		s4.releaseSlot();
-
-		assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
-		assertEquals(2, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
-		assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-
-		// we can still not schedule anything from the second group of vertices
-		try {
-			testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			fail("Scheduler accepted too many tasks at the same time");
-		}
-		catch (ExecutionException e) {
-			assertTrue(e.getCause() instanceof NoResourceAvailableException);
-		}
-		catch (Exception e) {
-			fail("Wrong exception.");
-		}
-
-		// we can schedule something from the first vertex group
-		LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-		assertNotNull(s5);
-
-		assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
-		assertEquals(1, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
-		assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-
-
-		// now we release a slot from the second vertex group and schedule another task from that group
-		s2_2.releaseSlot();
-		LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-		assertNotNull(s5_2);
-
-		// release all slots
-		s2.releaseSlot();
-		s3.releaseSlot();
-		s5.releaseSlot();
-
-		s1_2.releaseSlot();
-		s3_2.releaseSlot();
-		s4_2.releaseSlot();
-		s5_2.releaseSlot();
-
-		// test that everything is released
-		assertEquals(0, testingSlotProvider.getNumberOfSlots(sharingGroup));
-		assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
-
-		// check the scheduler's bookkeeping
-		assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
-		assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
-		assertEquals(10, testingSlotProvider.getNumberOfUnconstrainedAssignments());
-	}
-	
-	@Test
-	public void allocateSlotWithIntermediateTotallyEmptySharingGroup() {
-		try {
-			JobVertexID jid1 = new JobVertexID();
-			JobVertexID jid2 = new JobVertexID();
-			
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
-			testingSlotProvider.addTaskManager(2);
-			testingSlotProvider.addTaskManager(2);
-
-			// schedule 4 tasks from the first vertex group
-			LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
-			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
-			assertEquals(4, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-			
-			s1.releaseSlot();
-			s2.releaseSlot();
-			s3.releaseSlot();
-			s4.releaseSlot();
-			
-			assertEquals(0, testingSlotProvider.getNumberOfSlots(sharingGroup));
-			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
-			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-			
-			// schedule some tasks from the second ID group
-			LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
-			assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
-			assertEquals(4, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
-			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-			
-			s1_2.releaseSlot();
-			s2_2.releaseSlot();
-			s3_2.releaseSlot();
-			s4_2.releaseSlot();
-			
-			assertEquals(0, testingSlotProvider.getNumberOfSlots(sharingGroup));
-			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
-			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-
-			// test that everything is released
-			assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
-			
-			// check the scheduler's bookkeeping
-			assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
-			assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
-			assertEquals(8, testingSlotProvider.getNumberOfUnconstrainedAssignments());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void allocateSlotWithTemporarilyEmptyVertexGroup() {
-		try {
-			JobVertexID jid1 = new JobVertexID();
-			JobVertexID jid2 = new JobVertexID();
-			JobVertexID jid3 = new JobVertexID();
-			
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3);
-
-			testingSlotProvider.addTaskManager(2);
-			testingSlotProvider.addTaskManager(2);
-
-			// schedule 4 tasks from the first vertex group
-			LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s3_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s4_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			assertNotNull(s1_1);
-			assertNotNull(s2_1);
-			assertNotNull(s3_1);
-			assertNotNull(s4_1);
-			
-			assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1));
-			
-			// schedule 4 tasks from the second vertex group
-			LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			assertNotNull(s1_2);
-			assertNotNull(s2_2);
-			assertNotNull(s3_2);
-			assertNotNull(s4_2);
-			
-			assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2));
-			
-			// schedule 4 tasks from the third vertex group
-			LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s3_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s4_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			assertNotNull(s1_3);
-			assertNotNull(s2_3);
-			assertNotNull(s3_3);
-			assertNotNull(s4_3);
-			
-			assertTrue(areAllDistinct(s1_3, s2_3, s3_3, s4_3));
-			
-			
-			// we cannot schedule another task from the second vertex group
-			try {
-				testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-				fail("Scheduler accepted too many tasks at the same time");
-			}
-			catch (ExecutionException e) {
-				assertTrue(e.getCause() instanceof NoResourceAvailableException);
-			}
-			catch (Exception e) {
-				fail("Wrong exception.");
-			}
-			
-			// release the second vertex group
-			s1_2.releaseSlot();
-			s2_2.releaseSlot();
-			s3_2.releaseSlot();
-			s4_2.releaseSlot();
-			
-			LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s6_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s7_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			assertNotNull(s5_2);
-			assertNotNull(s6_2);
-			assertNotNull(s7_2);
-			
-			// release the slots
-			s1_1.releaseSlot();
-			s2_1.releaseSlot();
-			s3_1.releaseSlot();
-			s4_1.releaseSlot();
-			
-			s5_2.releaseSlot();
-			s6_2.releaseSlot();
-			s7_2.releaseSlot();
-			
-			// test that everything is released
-			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlots());
-			
-			s1_3.releaseSlot();
-			s2_3.releaseSlot();
-			s3_3.releaseSlot();
-			s4_3.releaseSlot();
-			
-			// test that everything is released
-			assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
-			
-			// check the scheduler's bookkeeping
-			assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
-			assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
-			assertEquals(15, testingSlotProvider.getNumberOfUnconstrainedAssignments());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void allocateSlotWithTemporarilyEmptyVertexGroup2() {
-		try {
-			JobVertexID jid1 = new JobVertexID();
-			JobVertexID jid2 = new JobVertexID();
-			JobVertexID jid3 = new JobVertexID();
-			
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
-			testingSlotProvider.addTaskManager(2);
-
-			// schedule 1 tasks from the first vertex group and 2 from the second
-			LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			assertNotNull(s1_1);
-			assertNotNull(s2_1);
-			assertNotNull(s2_2);
-			
-			assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
-			assertEquals(1, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
-			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-			
-			// release the two from the second
-			s2_1.releaseSlot();
-			s2_2.releaseSlot();
-			
-			
-			// this should free one slot so we can allocate one non-shared
-			LogicalSlot sx = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			assertNotNull(sx);
-			
-			assertEquals(1, testingSlotProvider.getNumberOfSlots(sharingGroup));
-			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
-			assertEquals(1, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-			
-			// check the scheduler's bookkeeping
-			assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
-			assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
-			assertEquals(4, testingSlotProvider.getNumberOfUnconstrainedAssignments());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void scheduleMixedSharingAndNonSharing() {
-		try {
-			JobVertexID jid1 = new JobVertexID();
-			JobVertexID jid2 = new JobVertexID();
-			
-			JobVertexID jidA = new JobVertexID();
-			JobVertexID jidB= new JobVertexID();
-			JobVertexID jidC = new JobVertexID();
-			
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
-			testingSlotProvider.addTaskManager(3);
-			testingSlotProvider.addTaskManager(2);
-
-			// schedule some individual vertices
-			LogicalSlot sA2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot sA1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			assertNotNull(sA1);
-			assertNotNull(sA2);
-			
-			// schedule some vertices in the sharing group
-			LogicalSlot s1_0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2_0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			assertNotNull(s1_0);
-			assertNotNull(s1_1);
-			assertNotNull(s2_0);
-			assertNotNull(s2_1);
-			
-			// schedule another isolated vertex
-			LogicalSlot sB1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			assertNotNull(sB1);
-			
-			// should not be able to schedule more vertices
-			try {
-				testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-				fail("Scheduler accepted too many tasks at the same time");
-			}
-			catch (ExecutionException e) {
-				assertTrue(e.getCause() instanceof NoResourceAvailableException);
-			}
-			catch (Exception e) {
-				fail("Wrong exception.");
-			}
-			
-			try {
-				testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-				fail("Scheduler accepted too many tasks at the same time");
-			}
-			catch (ExecutionException e) {
-				assertTrue(e.getCause() instanceof NoResourceAvailableException);
-			}
-			catch (Exception e) {
-				fail("Wrong exception.");
-			}
-			
-			try {
-				testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-				fail("Scheduler accepted too many tasks at the same time");
-			}
-			catch (ExecutionException e) {
-				assertTrue(e.getCause() instanceof NoResourceAvailableException);
-			}
-			catch (Exception e) {
-				fail("Wrong exception.");
-			}
-			
-			try {
-				testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-				fail("Scheduler accepted too many tasks at the same time");
-			}
-			catch (ExecutionException e) {
-				assertTrue(e.getCause() instanceof NoResourceAvailableException);
-			}
-			catch (Exception e) {
-				fail("Wrong exception.");
-			}
-			
-			// release some isolated task and check that the sharing group may grow
-			sA1.releaseSlot();
-			
-			LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			assertNotNull(s1_2);
-			assertNotNull(s2_2);
-			
-			// release three of the previously allocated sub slots, which guarantees to return one shared slot
-			s1_0.releaseSlot();
-			s1_1.releaseSlot();
-			s2_0.releaseSlot();
-			
-			assertEquals(1, testingSlotProvider.getNumberOfAvailableSlots());
-			
-			// schedule one more no-shared task
-			LogicalSlot sB0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			assertNotNull(sB0);
-			
-			// release the last of the original shared slots and allocate one more non-shared slot
-			s2_1.releaseSlot();
-			LogicalSlot sB2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			assertNotNull(sB2);
-			
-			
-			// release on non-shared and add some shared slots
-			sA2.releaseSlot();
-			LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			assertNotNull(s1_3);
-			assertNotNull(s2_3);
-			
-			// release all shared and allocate all in non-shared
-			s1_2.releaseSlot();
-			s2_2.releaseSlot();
-			s1_3.releaseSlot();
-			s2_3.releaseSlot();
-			
-			LogicalSlot sC0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot sC1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			assertNotNull(sC0);
-			assertNotNull(sC1);
-			
-			
-			sB0.releaseSlot();
-			sB1.releaseSlot();
-			sB2.releaseSlot();
-			sC0.releaseSlot();
-			sC1.releaseSlot();
-			
-			// test that everything is released
-			assertEquals(5, testingSlotProvider.getNumberOfAvailableSlots());
-			
-			// check the scheduler's bookkeeping
-			assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
-			assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
-			assertEquals(15, testingSlotProvider.getNumberOfUnconstrainedAssignments());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	/**
-	 * Tests that the scheduler assigns the correct existing shared slots
-	 */
-	@Test
-	public void testLocalizedAssignment1() {
-		try {
-			JobVertexID jid1 = new JobVertexID();
-			JobVertexID jid2 = new JobVertexID();
-
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
-			TaskManagerLocation loc1 = testingSlotProvider.addTaskManager(2);
-			TaskManagerLocation loc2 = testingSlotProvider.addTaskManager(2);
-			
-			// schedule one to each instance
-			LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get();
-			assertNotNull(s1);
-			assertNotNull(s2);
-			
-			assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
-			assertEquals(loc1, s1.getTaskManagerLocation());
-			assertEquals(loc2, s2.getTaskManagerLocation());
-			
-			// schedule one from the other group to each instance
-			LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
-			LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get();
-			assertNotNull(s3);
-			assertNotNull(s4);
-			
-			assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
-			assertEquals(loc1, s3.getTaskManagerLocation());
-			assertEquals(loc2, s4.getTaskManagerLocation());
-			assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots());
-			
-			// check the scheduler's bookkeeping
-			assertEquals(4, testingSlotProvider.getNumberOfLocalizedAssignments());
-			assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
-			assertEquals(0, testingSlotProvider.getNumberOfUnconstrainedAssignments());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	/**
-	 * Tests that the scheduler assigns to new local slots, rather than to existing non-local slots
-	 */
-	@Test
-	public void testLocalizedAssignment2() {
-		try {
-			JobVertexID jid1 = new JobVertexID();
-			JobVertexID jid2 = new JobVertexID();
-
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
-			TaskManagerLocation loc1 = testingSlotProvider.addTaskManager(2);
-			TaskManagerLocation loc2 = testingSlotProvider.addTaskManager(2);
-
-			// schedule one to each instance
-			LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
-			assertNotNull(s1);
-			assertNotNull(s2);
-			
-			assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
-			assertEquals(loc1, s1.getTaskManagerLocation());
-			assertEquals(loc1, s2.getTaskManagerLocation());
-			
-			// schedule one from the other group to each instance
-			LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get();
-			LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get();
-			assertNotNull(s3);
-			assertNotNull(s4);
-			
-			assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
-			assertEquals(loc2, s3.getTaskManagerLocation());
-			assertEquals(loc2, s4.getTaskManagerLocation());
-			
-			// check the scheduler's bookkeeping
-			assertEquals(4, testingSlotProvider.getNumberOfLocalizedAssignments());
-			assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
-			assertEquals(0, testingSlotProvider.getNumberOfUnconstrainedAssignments());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	/**
-	 * Tests that the scheduler can fall back to non-local
-	 */
-	@Test
-	public void testLocalizedAssignment3() {
-		try {
-			JobVertexID jid1 = new JobVertexID();
-			JobVertexID jid2 = new JobVertexID();
-
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
-			TaskManagerLocation loc1 = testingSlotProvider.addTaskManager(2);
-			TaskManagerLocation loc2 = testingSlotProvider.addTaskManager(2);
-
-			// schedule until the one instance is full
-			LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
-			LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
-			LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
-
-			// schedule two more with preference of same instance --> need to go to other instance
-			LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
-			LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
-			
-			assertNotNull(s1);
-			assertNotNull(s2);
-			assertNotNull(s3);
-			assertNotNull(s4);
-			assertNotNull(s5);
-			assertNotNull(s6);
-			
-			assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
-
-			assertEquals(loc1, s1.getTaskManagerLocation());
-			assertEquals(loc1, s2.getTaskManagerLocation());
-			assertEquals(loc1, s3.getTaskManagerLocation());
-			assertEquals(loc1, s4.getTaskManagerLocation());
-			assertEquals(loc2, s5.getTaskManagerLocation());
-			assertEquals(loc2, s6.getTaskManagerLocation());
-			
-			// check the scheduler's bookkeeping
-			assertEquals(4, testingSlotProvider.getNumberOfLocalizedAssignments());
-			// Flink supports host localized assignments which happen in this case because all TaskManagerLocations point to the loopback address
-			assertTrue(2 == testingSlotProvider.getNumberOfNonLocalizedAssignments() || 2 == testingSlotProvider.getNumberOfHostLocalizedAssignments());
-
-			assertEquals(0, testingSlotProvider.getNumberOfUnconstrainedAssignments());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSequentialAllocateAndRelease() {
-		try {
-			final JobVertexID jid1 = new JobVertexID();
-			final JobVertexID jid2 = new JobVertexID();
-			final JobVertexID jid3 = new JobVertexID();
-			final JobVertexID jid4 = new JobVertexID();
-			
-			final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
-			
-			testingSlotProvider.addTaskManager(4);
-
-			// allocate something from group 1 and 2 interleaved with schedule for group 3
-			LogicalSlot slot_1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot slot_1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
-			LogicalSlot slot_2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot slot_2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			LogicalSlot slot_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			LogicalSlot slot_1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot slot_1_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			LogicalSlot slot_2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot slot_2_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			// release groups 1 and 2
-			
-			slot_1_1.releaseSlot();
-			slot_1_2.releaseSlot();
-			slot_1_3.releaseSlot();
-			slot_1_4.releaseSlot();
-			
-			slot_2_1.releaseSlot();
-			slot_2_2.releaseSlot();
-			slot_2_3.releaseSlot();
-			slot_2_4.releaseSlot();
-			
-			// allocate group 4
-			
-			LogicalSlot slot_4_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot slot_4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot slot_4_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot slot_4_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			// release groups 3 and 4
-			
-			slot_3.releaseSlot();
-			
-			slot_4_1.releaseSlot();
-			slot_4_2.releaseSlot();
-			slot_4_3.releaseSlot();
-			slot_4_4.releaseSlot();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testConcurrentAllocateAndRelease() {
-		final ExecutorService executor = Executors.newFixedThreadPool(20);
-
-		try {
-			testingSlotProvider.addTaskManager(4);
-
-			for (int run = 0; run < 50; run++) {
-				final JobVertexID jid1 = new JobVertexID();
-				final JobVertexID jid2 = new JobVertexID();
-				final JobVertexID jid3 = new JobVertexID();
-				final JobVertexID jid4 = new JobVertexID();
-				
-				final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
-
-				final AtomicInteger enumerator1 = new AtomicInteger();
-				final AtomicInteger enumerator2 = new AtomicInteger();
-				final AtomicBoolean flag3 = new AtomicBoolean();
-				final AtomicInteger enumerator4 = new AtomicInteger();
-				
-				final Random rnd = new Random();
-				
-				// use atomic boolean as a mutable boolean reference
-				final AtomicBoolean failed = new AtomicBoolean(false);
-				
-				// use atomic integer as a mutable integer reference
-				final AtomicInteger completed = new AtomicInteger();
-				
-				final Runnable deploy4 = new Runnable() {
-					@Override
-					public void run() {
-						try {
-							LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-							sleepUninterruptibly(rnd.nextInt(5));
-							slot.releaseSlot();
-
-							if (completed.incrementAndGet() == 13) {
-								synchronized (completed) {
-									completed.notifyAll();
-								}
-							}
-						}
-						catch (Throwable t) {
-							t.printStackTrace();
-							failed.set(true);
-						}
-					}
-				};
-				
-				final Runnable deploy3 = new Runnable() {
-					@Override
-					public void run() {
-						try {
-							if (flag3.compareAndSet(false, true)) {
-								LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-								sleepUninterruptibly(5);
-								
-								executor.execute(deploy4);
-								executor.execute(deploy4);
-								executor.execute(deploy4);
-								executor.execute(deploy4);
-								
-								slot.releaseSlot();
-								
-								if (completed.incrementAndGet() == 13) {
-									synchronized (completed) {
-										completed.notifyAll();
-									}
-								}
-							}
-						}
-						catch (Throwable t) {
-							t.printStackTrace();
-							failed.set(true);
-						}
-					}
-				};
-				
-				final Runnable deploy2 = new Runnable() {
-					@Override
-					public void run() {
-						try {
-							LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
-							// wait a bit till scheduling the successor
-							sleepUninterruptibly(rnd.nextInt(5));
-							executor.execute(deploy3);
-							
-							// wait a bit until release
-							sleepUninterruptibly(rnd.nextInt(5));
-							slot.releaseSlot();
-							
-							if (completed.incrementAndGet() == 13) {
-								synchronized (completed) {
-									completed.notifyAll();
-								}
-							}
-						}
-						catch (Throwable t) {
-							t.printStackTrace();
-							failed.set(true);
-						}
-					}
-				};
-				
-				final Runnable deploy1 = new Runnable() {
-					@Override
-					public void run() {
-						try {
-							LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
-							// wait a bit till scheduling the successor
-							sleepUninterruptibly(rnd.nextInt(5));
-							executor.execute(deploy2);
-							
-							// wait a bit until release
-							sleepUninterruptibly(rnd.nextInt(5));
-							slot.releaseSlot();
-							
-							if (completed.incrementAndGet() == 13) {
-								synchronized (completed) {
-									completed.notifyAll();
-								}
-							}
-						}
-						catch (Throwable t) {
-							t.printStackTrace();
-							failed.set(true);
-						}
-					}
-				};
-				
-				final Runnable deploy0 = new Runnable() {
-					@Override
-					public void run() {
-						sleepUninterruptibly(rnd.nextInt(10));
-						executor.execute(deploy1);
-					}
-				};
-				executor.execute(deploy0);
-				executor.execute(deploy0);
-				executor.execute(deploy0);
-				executor.execute(deploy0);
-				
-				// wait until all tasks have finished
-				//noinspection SynchronizationOnLocalVariableOrMethodParameter
-				synchronized (completed) {
-					while (!failed.get() && completed.get() < 13) {
-						completed.wait(1000);
-					}
-				}
-				
-				assertFalse("Thread failed", failed.get());
-				
-				while (testingSlotProvider.getNumberOfAvailableSlots() < 4) {
-					sleepUninterruptibly(5);
-				}
-				
-				assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
-				assertEquals(13 * (run + 1), testingSlotProvider.getNumberOfUnconstrainedAssignments());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testDopIncreases() {
-		try {
-			JobVertexID jid1 = new JobVertexID();
-			JobVertexID jid2 = new JobVertexID();
-			JobVertexID jid3 = new JobVertexID();
-			JobVertexID jid4 = new JobVertexID();
-			
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
-			
-			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-			scheduler.newInstanceAvailable(getRandomInstance(4));
-			
-			// schedule one task for the first and second vertex
-			LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			assertEquals( s1.getTaskManagerLocation(), s2.getTaskManagerLocation() );
-			assertEquals(3, scheduler.getNumberOfAvailableSlots());
-			
-			LogicalSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			s1.releaseSlot();
-			s2.releaseSlot();
-			
-			LogicalSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			LogicalSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-			
-			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-				fail("should throw an exception");
-			}
-			catch (ExecutionException e) {
-				assertTrue(e.getCause() instanceof NoResourceAvailableException);
-			}
-
-			assertEquals(0, scheduler.getNumberOfAvailableSlots());
-			
-			s3_0.releaseSlot();
-			s3_1.releaseSlot();
-			s3_2.releaseSlot();
-			s3_3.releaseSlot();
-			s4_0.releaseSlot();
-			s4_1.releaseSlot();
-			s4_2.releaseSlot();
-			s4_3.releaseSlot();
-			
-			assertEquals(4, scheduler.getNumberOfAvailableSlots());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	private static SlotProfile slotProfileForLocation(TaskManagerLocation location) {
-		return new SlotProfile(ResourceProfile.UNKNOWN, Collections.singletonList(location), Collections.emptyList());
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
deleted file mode 100644
index d9919ac..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.hamcrest.Matchers;
-import org.junit.Test;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-public class SchedulerTest extends TestLogger {
-
-	@Test
-	public void testAddAndRemoveInstance() {
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-
-		Instance i1 = getRandomInstance(2);
-		Instance i2 = getRandomInstance(2);
-		Instance i3 = getRandomInstance(2);
-
-		assertEquals(0, scheduler.getNumberOfAvailableInstances());
-		assertEquals(0, scheduler.getNumberOfAvailableSlots());
-		scheduler.newInstanceAvailable(i1);
-		assertEquals(1, scheduler.getNumberOfAvailableInstances());
-		assertEquals(2, scheduler.getNumberOfAvailableSlots());
-		scheduler.newInstanceAvailable(i2);
-		assertEquals(2, scheduler.getNumberOfAvailableInstances());
-		assertEquals(4, scheduler.getNumberOfAvailableSlots());
-		scheduler.newInstanceAvailable(i3);
-		assertEquals(3, scheduler.getNumberOfAvailableInstances());
-		assertEquals(6, scheduler.getNumberOfAvailableSlots());
-
-		// cannot add available instance again
-		try {
-			scheduler.newInstanceAvailable(i2);
-			fail("Scheduler accepted instance twice");
-		}
-		catch (IllegalArgumentException e) {
-			// bueno!
-		}
-
-		// some instances die
-		assertEquals(3, scheduler.getNumberOfAvailableInstances());
-		assertEquals(6, scheduler.getNumberOfAvailableSlots());
-		scheduler.instanceDied(i2);
-		assertEquals(2, scheduler.getNumberOfAvailableInstances());
-		assertEquals(4, scheduler.getNumberOfAvailableSlots());
-
-		// try to add a dead instance
-		try {
-			scheduler.newInstanceAvailable(i2);
-			fail("Scheduler accepted dead instance");
-		}
-		catch (IllegalArgumentException e) {
-			// stimmt
-
-		}
-
-		scheduler.instanceDied(i1);
-		assertEquals(1, scheduler.getNumberOfAvailableInstances());
-		assertEquals(2, scheduler.getNumberOfAvailableSlots());
-		scheduler.instanceDied(i3);
-		assertEquals(0, scheduler.getNumberOfAvailableInstances());
-		assertEquals(0, scheduler.getNumberOfAvailableSlots());
-
-		assertFalse(i1.isAlive());
-		assertFalse(i2.isAlive());
-		assertFalse(i3.isAlive());
-	}
-
-	/**
-	 * Tests that the Scheduler times out uncompleted slot futures.
-	 */
-	@Test
-	public void testSlotAllocationTimeout() throws Exception {
-		final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
-
-		final ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph();
-
-		final Map<ExecutionAttemptID, Execution> registeredExecutions = executionGraph.getRegisteredExecutions();
-
-		assertThat(registeredExecutions.values(), Matchers.not(Matchers.empty()));
-
-		final Execution execution = registeredExecutions.values().iterator().next();
-
-		final CompletableFuture<LogicalSlot> slotFuture = scheduler.allocateSlot(
-			new ScheduledUnit(
-				execution),
-			true,
-			SlotProfile.noRequirements(),
-			Time.milliseconds(1L));
-
-		try {
-			slotFuture.get();
-		} catch (ExecutionException ee) {
-			assertThat(ExceptionUtils.stripExecutionException(ee), Matchers.instanceOf(TimeoutException.class));
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
index 683b0cd..f62f89f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -61,7 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Test base for scheduler related test cases. The test are
- * executed with the {@link Scheduler} and the {@link SlotPool}.
+ * executed with the {@link SlotPool}.
  */
 public class SchedulerTestBase extends TestLogger {
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 8ff47c7..8fc04db 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -31,11 +31,11 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -49,6 +49,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -139,7 +140,7 @@ public class RescalePartitionerTest extends StreamPartitionerTest {
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
 			new RestartAllStrategy.Factory(),
-			new Scheduler(TestingUtils.defaultExecutionContext()),
+			new TestingSlotProvider(ignored -> new CompletableFuture<>()),
 			ExecutionGraph.class.getClassLoader(),
 			VoidBlobWriter.getInstance(),
 			AkkaUtils.getDefaultTimeout());