You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/02 15:36:10 UTC

[6/8] flink git commit: [FLINK-4490] [distributed coordination] (part 3) Rename methods on 'Instance' to have more intuitive names

[FLINK-4490] [distributed coordination] (part 3) Rename methods on 'Instance' to have more intuitive names

getResourceID() --> getTaskManagerID()
getInstanceConnectionInfo() --> getTaskManagerLocation()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eac6088a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eac6088a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eac6088a

Branch: refs/heads/master
Commit: eac6088a75e813a015b778f4cfc4cce0cf2a53ce
Parents: aaa474a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 31 13:59:01 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 2 17:32:57 2016 +0200

----------------------------------------------------------------------
 .../handlers/TaskManagersHandler.java           |   2 +-
 ...PartialInputChannelDeploymentDescriptor.java |  21 +-
 .../apache/flink/runtime/instance/Instance.java |  28 +-
 .../flink/runtime/instance/InstanceManager.java |  20 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |  18 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   3 +-
 .../ExecutionGraphMetricsTest.java              | 404 ++++++++++---------
 .../executiongraph/ExecutionGraphTestUtils.java |   2 +-
 .../TerminalStateDeadlockTest.java              |   3 +-
 .../runtime/instance/InstanceManagerTest.java   |  31 +-
 .../flink/runtime/instance/InstanceTest.java    |   6 +-
 .../flink/runtime/instance/SharedSlotsTest.java |  24 +-
 .../flink/runtime/instance/SimpleSlotTest.java  |   2 +-
 .../partition/SpilledSubpartitionViewTest.java  |   4 +-
 .../scheduler/CoLocationConstraintTest.java     |   6 +-
 .../ScheduleWithCoLocationHintTest.java         |  22 +-
 .../scheduler/SchedulerSlotSharingTest.java     |  30 +-
 .../scheduler/SchedulerTestUtils.java           |   5 +-
 .../scheduler/SlotAllocationFutureTest.java     |  12 +-
 19 files changed, 319 insertions(+), 324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index b60cd10..b5e9088 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -85,7 +85,7 @@ public class TaskManagersHandler implements RequestHandler {
 					gen.writeStartObject();
 					gen.writeStringField("id", instance.getId().toString());
 					gen.writeStringField("path", instance.getActorGateway().path());
-					gen.writeNumberField("dataPort", instance.getInstanceConnectionInfo().dataPort());
+					gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
 					gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
 					gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots());
 					gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots());

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
index e1391a4..0eac39d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
@@ -44,7 +44,7 @@ public class PartialInputChannelDeploymentDescriptor {
 	private final ResultPartitionID partitionID;
 
 	/** The partition connection info. */
-	private final TaskManagerLocation partitionConnectionInfo;
+	private final TaskManagerLocation partitionTaskManagerLocation;
 
 	/** The partition connection index. */
 	private final int partitionConnectionIndex;
@@ -52,12 +52,12 @@ public class PartialInputChannelDeploymentDescriptor {
 	public PartialInputChannelDeploymentDescriptor(
 			IntermediateDataSetID resultId,
 			ResultPartitionID partitionID,
-			TaskManagerLocation partitionConnectionInfo,
+			TaskManagerLocation partitionTaskManagerLocation,
 			int partitionConnectionIndex) {
 
 		this.resultId = checkNotNull(resultId);
 		this.partitionID = checkNotNull(partitionID);
-		this.partitionConnectionInfo = checkNotNull(partitionConnectionInfo);
+		this.partitionTaskManagerLocation = checkNotNull(partitionTaskManagerLocation);
 		this.partitionConnectionIndex = partitionConnectionIndex;
 	}
 
@@ -66,23 +66,20 @@ public class PartialInputChannelDeploymentDescriptor {
 	 *
 	 * @see InputChannelDeploymentDescriptor
 	 */
-	public InputChannelDeploymentDescriptor createInputChannelDeploymentDescriptor(
-			Execution consumerExecution) {
+	public InputChannelDeploymentDescriptor createInputChannelDeploymentDescriptor(Execution consumerExecution) {
+		checkNotNull(consumerExecution, "consumerExecution");
 
-		checkNotNull(consumerExecution, "Consumer execution null");
-
-		TaskManagerLocation consumerConnectionInfo = consumerExecution.getAssignedResourceLocation();
-
-		checkNotNull(consumerConnectionInfo, "Consumer connection info null");
+		TaskManagerLocation consumerLocation = consumerExecution.getAssignedResourceLocation();
+		checkNotNull(consumerLocation, "Consumer connection info null");
 
 		final ResultPartitionLocation partitionLocation;
 
-		if (consumerConnectionInfo.equals(partitionConnectionInfo)) {
+		if (consumerLocation.equals(partitionTaskManagerLocation)) {
 			partitionLocation = ResultPartitionLocation.createLocal();
 		}
 		else {
 			partitionLocation = ResultPartitionLocation.createRemote(
-					new ConnectionID(partitionConnectionInfo, partitionConnectionIndex));
+					new ConnectionID(partitionTaskManagerLocation, partitionConnectionIndex));
 		}
 
 		return new InputChannelDeploymentDescriptor(partitionID, partitionLocation);

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index fe46895..4a8139b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -52,14 +52,11 @@ public class Instance implements SlotOwner {
 	private final ActorGateway actorGateway;
 
 	/** The instance connection information for the data transfer. */
-	private final TaskManagerLocation connectionInfo;
+	private final TaskManagerLocation location;
 
 	/** A description of the resources of the task manager */
 	private final HardwareDescription resources;
 
-	/** The ID identifies the resource the task manager runs on */
-	private final ResourceID resourceId;
-
 	/** The ID identifying the taskManager. */
 	private final InstanceID instanceId;
 
@@ -90,22 +87,19 @@ public class Instance implements SlotOwner {
 	 * Constructs an instance reflecting a registered TaskManager.
 	 *
 	 * @param actorGateway The actor gateway to communicate with the remote instance
-	 * @param connectionInfo The remote connection where the task manager receives requests.
-	 * @param resourceId The resource id which denotes the resource the task manager uses.
+	 * @param location The remote connection where the task manager receives requests.
 	 * @param id The id under which the taskManager is registered.
 	 * @param resources The resources available on the machine.
 	 * @param numberOfSlots The number of task slots offered by this taskManager.
 	 */
 	public Instance(
 			ActorGateway actorGateway,
-			TaskManagerLocation connectionInfo,
-			ResourceID resourceId,
+			TaskManagerLocation location,
 			InstanceID id,
 			HardwareDescription resources,
 			int numberOfSlots) {
 		this.actorGateway = actorGateway;
-		this.connectionInfo = connectionInfo;
-		this.resourceId = resourceId;
+		this.location = location;
 		this.instanceId = id;
 		this.resources = resources;
 		this.numberOfSlots = numberOfSlots;
@@ -120,8 +114,8 @@ public class Instance implements SlotOwner {
 	// Properties
 	// --------------------------------------------------------------------------------------------
 
-	public ResourceID getResourceId() {
-		return resourceId;
+	public ResourceID getTaskManagerID() {
+		return location.getResourceID();
 	}
 
 	public InstanceID getId() {
@@ -246,7 +240,7 @@ public class Instance implements SlotOwner {
 				return null;
 			}
 			else {
-				SimpleSlot slot = new SimpleSlot(jobID, this, connectionInfo, nextSlot, actorGateway);
+				SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, actorGateway);
 				allocatedSlots.add(slot);
 				return slot;
 			}
@@ -284,7 +278,7 @@ public class Instance implements SlotOwner {
 			}
 			else {
 				SharedSlot slot = new SharedSlot(
-						jobID, this, connectionInfo, nextSlot, actorGateway, sharingGroupAssignment);
+						jobID, this, location, nextSlot, actorGateway, sharingGroupAssignment);
 				allocatedSlots.add(slot);
 				return slot;
 			}
@@ -355,8 +349,8 @@ public class Instance implements SlotOwner {
 		return actorGateway;
 	}
 
-	public TaskManagerLocation getInstanceConnectionInfo() {
-		return connectionInfo;
+	public TaskManagerLocation getTaskManagerLocation() {
+		return location;
 	}
 
 	public int getNumberOfAvailableSlots() {
@@ -405,7 +399,7 @@ public class Instance implements SlotOwner {
 
 	@Override
 	public String toString() {
-		return String.format("%s @ %s - %d slots - URL: %s", instanceId, connectionInfo.getHostname(),
+		return String.format("%s @ %s - %d slots - URL: %s", instanceId, location.getHostname(),
 				numberOfSlots, (actorGateway != null ? actorGateway.path() : "No instance gateway"));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index e7a4537..0c7e187 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -138,8 +138,7 @@ public class InstanceManager {
 	 * for the job execution.
 	 *
 	 * @param taskManager ActorRef to the TaskManager which wants to be registered
-	 * @param resourceID The resource id of the TaskManager
-	 * @param connectionInfo ConnectionInfo of the TaskManager
+	 * @param taskManagerLocation Location info of the TaskManager
 	 * @param resources Hardware description of the TaskManager
 	 * @param numberOfSlots Number of available slots on the TaskManager
 	 * @param leaderSessionID The current leader session ID of the JobManager
@@ -147,12 +146,12 @@ public class InstanceManager {
 	 */
 	public InstanceID registerTaskManager(
 			ActorRef taskManager,
-			ResourceID resourceID,
-			TaskManagerLocation connectionInfo,
+			TaskManagerLocation taskManagerLocation,
 			HardwareDescription resources,
 			int numberOfSlots,
-			UUID leaderSessionID){
-		synchronized(this.lock){
+			UUID leaderSessionID) {
+		
+		synchronized (this.lock) {
 			if (this.isShutdown) {
 				throw new IllegalStateException("InstanceManager is shut down.");
 			}
@@ -174,12 +173,11 @@ public class InstanceManager {
 
 			InstanceID instanceID = new InstanceID();
 
-			Instance host = new Instance(actorGateway, connectionInfo, resourceID, instanceID,
-				resources, numberOfSlots);
+			Instance host = new Instance(actorGateway, taskManagerLocation, instanceID, resources, numberOfSlots);
 
 			registeredHostsById.put(instanceID, host);
 			registeredHostsByConnection.put(taskManager, host);
-			registeredHostsByResource.put(resourceID, host);
+			registeredHostsByResource.put(taskManagerLocation.getResourceID(), host);
 
 			totalNumberOfAliveTaskSlots += numberOfSlots;
 
@@ -187,7 +185,7 @@ public class InstanceManager {
 				LOG.info(String.format("Registered TaskManager at %s (%s) as %s. " +
 								"Current number of registered hosts is %d. " +
 								"Current number of alive task slots is %d.",
-						connectionInfo.getHostname(),
+						taskManagerLocation.getHostname(),
 						taskManager.path(),
 						instanceID,
 						registeredHostsById.size(),
@@ -217,7 +215,7 @@ public class InstanceManager {
 
 			registeredHostsByConnection.remove(host);
 			registeredHostsById.remove(instance.getId());
-			registeredHostsByResource.remove(instance.getResourceId());
+			registeredHostsByResource.remove(instance.getTaskManagerID());
 
 			if (terminated) {
 				deadHosts.add(instance.getActorGateway().actor());

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index b481b55..734972d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -363,7 +363,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				
 				// if the instance has further available slots, re-add it to the set of available resources.
 				if (instanceToUse.hasResourcesAvailable()) {
-					this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), instanceToUse);
+					this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
 				}
 				
 				if (slot != null) {
@@ -425,7 +425,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 
 				// if the instance has further available slots, re-add it to the set of available resources.
 				if (instanceToUse.hasResourcesAvailable()) {
-					this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), instanceToUse);
+					this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
 				}
 
 				if (sharedSlot != null) {
@@ -469,7 +469,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		while (this.newlyAvailableInstances.size() > 0) {
 			Instance queuedInstance = this.newlyAvailableInstances.poll();
 			if (queuedInstance != null) {
-				this.instancesWithAvailableResources.put(queuedInstance.getResourceId(), queuedInstance);
+				this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), queuedInstance);
 			}
 		}
 		
@@ -583,7 +583,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				}
 			}
 			else {
-				this.instancesWithAvailableResources.put(instance.getResourceId(), instance);
+				this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
 			}
 		}
 	}
@@ -649,7 +649,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				instance.setSlotAvailabilityListener(this);
 				
 				// store the instance in the by-host-lookup
-				String instanceHostName = instance.getInstanceConnectionInfo().getHostname();
+				String instanceHostName = instance.getTaskManagerLocation().getHostname();
 				Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);
 				if (instanceSet == null) {
 					instanceSet = new HashSet<Instance>();
@@ -658,7 +658,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				instanceSet.add(instance);
 
 				// add it to the available resources and let potential waiters know
-				this.instancesWithAvailableResources.put(instance.getResourceId(), instance);
+				this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
 
 				// add all slots as available
 				for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
@@ -693,9 +693,9 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		}
 
 		allInstances.remove(instance);
-		instancesWithAvailableResources.remove(instance.getResourceId());
+		instancesWithAvailableResources.remove(instance.getTaskManagerID());
 
-		String instanceHostName = instance.getInstanceConnectionInfo().getHostname();
+		String instanceHostName = instance.getTaskManagerLocation().getHostname();
 		Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);
 		if (instanceSet != null) {
 			instanceSet.remove(instance);
@@ -795,7 +795,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 
 			while ((instance = newlyAvailableInstances.poll()) != null) {
 				if (instance.hasResourcesAvailable()) {
-					instancesWithAvailableResources.put(instance.getResourceId(), instance);
+					instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2a0ecc2..88af604 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -349,7 +349,7 @@ class JobManager(
       currentResourceManager = Option(msg.resourceManager())
 
       val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map(
-        instance => instance.getResourceId).toList.asJava
+        instance => instance.getTaskManagerID).toList.asJava
 
       // confirm registration and send known task managers with their resource ids
       sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources))
@@ -425,7 +425,6 @@ class JobManager(
         try {
           val instanceID = instanceManager.registerTaskManager(
             taskManager,
-            resourceId,
             connectionInfo,
             hardwareInformation,
             numberOfSlots,

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index d8bd6cb..d5520fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -47,8 +48,10 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 import org.mockito.Matchers;
+
 import scala.concurrent.ExecutionContext$;
 import scala.concurrent.Future$;
 import scala.concurrent.duration.FiniteDuration;
@@ -60,7 +63,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -76,200 +80,210 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 	 */
 	@Test
 	public void testExecutionGraphRestartTimeMetric() throws JobException, IOException, InterruptedException {
-		// setup execution graph with mocked scheduling logic
-		int parallelism = 1;
-
-		JobVertex jobVertex = new JobVertex("TestVertex");
-		jobVertex.setParallelism(parallelism);
-		jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-		JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
-
-		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
-
-		Configuration jobConfig = new Configuration();
-
-		FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
-
-		MetricRegistry metricRegistry = new MetricRegistry(config);
-
-		assertTrue(metricRegistry.getReporters().size() == 1);
-
-		MetricReporter reporter = metricRegistry.getReporters().get(0);
-
-		assertTrue(reporter instanceof TestingReporter);
-
-		TestingReporter testingReporter = (TestingReporter) reporter;
-
-		MetricGroup metricGroup = new JobManagerMetricGroup(metricRegistry, "localhost");
-
-		Scheduler scheduler = mock(Scheduler.class);
-
-		SimpleSlot simpleSlot = mock(SimpleSlot.class);
-
-		Instance instance = mock(Instance.class);
-
-		TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
-
-		Slot rootSlot = mock(Slot.class);
-
-		ActorGateway actorGateway = mock(ActorGateway.class);
-
-		when(simpleSlot.isAlive()).thenReturn(true);
-		when(simpleSlot.getTaskManagerID()).thenReturn(instance.getResourceId());
-		when(simpleSlot.getTaskManagerLocation()).thenReturn(instance.getInstanceConnectionInfo());
-		when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
-		when(simpleSlot.getRoot()).thenReturn(rootSlot);
-
-		when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot);
-
-		when(instance.getInstanceConnectionInfo()).thenReturn(taskManagerLocation);
-		when(instance.getActorGateway()).thenReturn(actorGateway);
-		when(taskManagerLocation.getHostname()).thenReturn("localhost");
-
-		when(rootSlot.getSlotNumber()).thenReturn(0);
-
-		when(actorGateway.ask(Matchers.any(Object.class), Matchers.any(FiniteDuration.class))).thenReturn(Future$.MODULE$.<Object>successful(Messages.getAcknowledge()));
-
-		TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
-
-		ExecutionGraph executionGraph = new ExecutionGraph(
-			ExecutionContext$.MODULE$.fromExecutor(new ForkJoinPool()),
-			jobGraph.getJobID(),
-			jobGraph.getName(),
-			jobConfig,
-			new SerializedValue<ExecutionConfig>(null),
-			timeout,
-			testingRestartStrategy,
-			Collections.<BlobKey>emptyList(),
-			Collections.<URL>emptyList(),
-			getClass().getClassLoader(),
-			metricGroup);
-
-		// get restarting time metric
-		Metric metric = testingReporter.getMetric(ExecutionGraph.RESTARTING_TIME_METRIC_NAME);
-
-		assertNotNull(metric);
-		assertTrue(metric instanceof Gauge);
-
-		@SuppressWarnings("unchecked")
-		Gauge<Long> restartingTime = (Gauge<Long>) metric;
-
-		// check that the restarting time is 0 since it's the initial start
-		assertTrue(0L == restartingTime.getValue());
-
-		executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-
-		// start execution
-		executionGraph.scheduleForExecution(scheduler);
-
-		assertTrue(0L == restartingTime.getValue());
-
-		List<ExecutionAttemptID> executionIDs = new ArrayList<>();
-
-		for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) {
-			executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
-		}
-
-		// tell execution graph that the tasks are in state running --> job status switches to state running
-		for (ExecutionAttemptID executionID : executionIDs) {
-			executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
-		}
-
-		assertEquals(JobStatus.RUNNING, executionGraph.getState());
-
-		assertTrue(0L == restartingTime.getValue());
-
-		// fail the job so that it goes into state restarting
-		for (ExecutionAttemptID executionID : executionIDs) {
-			executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
-		}
-
-		assertEquals(JobStatus.RESTARTING, executionGraph.getState());
-
-		long firstRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
-
-		// wait some time so that the restarting time gauge shows a value different from 0
-		Thread.sleep(50);
-
-		long previousRestartingTime = restartingTime.getValue();
-
-		// check that the restarting time is monotonically increasing
-		for (int i = 0; i < 10; i++) {
-			long currentRestartingTime = restartingTime.getValue();
-
-			assertTrue(currentRestartingTime >= previousRestartingTime);
-			previousRestartingTime = currentRestartingTime;
-		}
-
-		// check that we have measured some restarting time
-		assertTrue(previousRestartingTime > 0);
-
-		// restart job
-		testingRestartStrategy.restartExecutionGraph();
-
-		executionIDs.clear();
-
-		for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) {
-			executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
-		}
-
-		for (ExecutionAttemptID executionID : executionIDs) {
-			executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
-		}
-
-		assertEquals(JobStatus.RUNNING, executionGraph.getState());
-
-		assertTrue(firstRestartingTimestamp != 0);
-
-		previousRestartingTime = restartingTime.getValue();
-
-		// check that the restarting time does not increase after we've reached the running state
-		for (int i = 0; i < 10; i++) {
-			long currentRestartingTime = restartingTime.getValue();
-
-			assertTrue(currentRestartingTime == previousRestartingTime);
-			previousRestartingTime = currentRestartingTime;
-		}
-
-		// fail job again
-		for (ExecutionAttemptID executionID : executionIDs) {
-			executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
-		}
-
-		assertEquals(JobStatus.RESTARTING, executionGraph.getState());
-
-		long secondRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
-
-		assertTrue(firstRestartingTimestamp != secondRestartingTimestamp);
-
-		Thread.sleep(50);
-
-		previousRestartingTime = restartingTime.getValue();
-
-		// check that the restarting time is increasing again
-		for (int i = 0; i < 10; i++) {
-			long currentRestartingTime = restartingTime.getValue();
-
-			assertTrue(currentRestartingTime >= previousRestartingTime);
-			previousRestartingTime = currentRestartingTime;
-		}
-
-		assertTrue(previousRestartingTime > 0);
-
-		// now lets fail the job while it is in restarting and see whether the restarting time then stops to increase
-		executionGraph.fail(new Exception());
-
-		assertEquals(JobStatus.FAILED, executionGraph.getState());
-
-		previousRestartingTime = restartingTime.getValue();
-
-		for (int i = 0; i < 10; i++) {
-			long currentRestartingTime = restartingTime.getValue();
-
-			assertTrue(currentRestartingTime == previousRestartingTime);
-			previousRestartingTime = currentRestartingTime;
+		final ExecutorService executor = Executors.newCachedThreadPool();
+		try {
+			// setup execution graph with mocked scheduling logic
+			int parallelism = 1;
+	
+			JobVertex jobVertex = new JobVertex("TestVertex");
+			jobVertex.setParallelism(parallelism);
+			jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+			JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
+	
+			Configuration config = new Configuration();
+			config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
+	
+			Configuration jobConfig = new Configuration();
+	
+			FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+	
+			MetricRegistry metricRegistry = new MetricRegistry(config);
+	
+			assertTrue(metricRegistry.getReporters().size() == 1);
+	
+			MetricReporter reporter = metricRegistry.getReporters().get(0);
+	
+			assertTrue(reporter instanceof TestingReporter);
+	
+			TestingReporter testingReporter = (TestingReporter) reporter;
+	
+			MetricGroup metricGroup = new JobManagerMetricGroup(metricRegistry, "localhost");
+	
+			Scheduler scheduler = mock(Scheduler.class);
+
+			ResourceID taskManagerId = ResourceID.generate();
+
+			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
+			when(taskManagerLocation.getResourceID()).thenReturn(taskManagerId);
+			when(taskManagerLocation.getHostname()).thenReturn("localhost");
+
+			ActorGateway actorGateway = mock(ActorGateway.class);
+
+			Instance instance = mock(Instance.class);
+			when(instance.getTaskManagerLocation()).thenReturn(taskManagerLocation);
+			when(instance.getTaskManagerID()).thenReturn(taskManagerId);
+			when(instance.getActorGateway()).thenReturn(actorGateway);
+
+			Slot rootSlot = mock(Slot.class);
+
+			SimpleSlot simpleSlot = mock(SimpleSlot.class);
+			when(simpleSlot.isAlive()).thenReturn(true);
+			when(simpleSlot.getTaskManagerLocation()).thenReturn(taskManagerLocation);
+			when(simpleSlot.getTaskManagerID()).thenReturn(taskManagerId);
+			when(simpleSlot.getTaskManagerActorGateway()).thenReturn(actorGateway);
+			when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
+			when(simpleSlot.getRoot()).thenReturn(rootSlot);
+
+			when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot);
+
+			
+
+			when(rootSlot.getSlotNumber()).thenReturn(0);
+
+			when(actorGateway.ask(Matchers.any(Object.class), Matchers.any(FiniteDuration.class))).thenReturn(Future$.MODULE$.<Object>successful(Messages.getAcknowledge()));
+
+			TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
+
+			ExecutionGraph executionGraph = new ExecutionGraph(
+				ExecutionContext$.MODULE$.fromExecutor(executor),
+				jobGraph.getJobID(),
+				jobGraph.getName(),
+				jobConfig,
+				new SerializedValue<ExecutionConfig>(null),
+				timeout,
+				testingRestartStrategy,
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
+				getClass().getClassLoader(),
+				metricGroup);
+	
+			// get restarting time metric
+			Metric metric = testingReporter.getMetric(ExecutionGraph.RESTARTING_TIME_METRIC_NAME);
+	
+			assertNotNull(metric);
+			assertTrue(metric instanceof Gauge);
+	
+			@SuppressWarnings("unchecked")
+			Gauge<Long> restartingTime = (Gauge<Long>) metric;
+	
+			// check that the restarting time is 0 since it's the initial start
+			assertTrue(0L == restartingTime.getValue());
+	
+			executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+	
+			// start execution
+			executionGraph.scheduleForExecution(scheduler);
+	
+			assertTrue(0L == restartingTime.getValue());
+	
+			List<ExecutionAttemptID> executionIDs = new ArrayList<>();
+	
+			for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) {
+				executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
+			}
+	
+			// tell execution graph that the tasks are in state running --> job status switches to state running
+			for (ExecutionAttemptID executionID : executionIDs) {
+				executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
+			}
+	
+			assertEquals(JobStatus.RUNNING, executionGraph.getState());
+	
+			assertTrue(0L == restartingTime.getValue());
+	
+			// fail the job so that it goes into state restarting
+			for (ExecutionAttemptID executionID : executionIDs) {
+				executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
+			}
+	
+			assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+	
+			long firstRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
+	
+			// wait some time so that the restarting time gauge shows a value different from 0
+			Thread.sleep(50);
+	
+			long previousRestartingTime = restartingTime.getValue();
+	
+			// check that the restarting time is monotonically increasing
+			for (int i = 0; i < 10; i++) {
+				long currentRestartingTime = restartingTime.getValue();
+	
+				assertTrue(currentRestartingTime >= previousRestartingTime);
+				previousRestartingTime = currentRestartingTime;
+			}
+	
+			// check that we have measured some restarting time
+			assertTrue(previousRestartingTime > 0);
+	
+			// restart job
+			testingRestartStrategy.restartExecutionGraph();
+	
+			executionIDs.clear();
+	
+			for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) {
+				executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
+			}
+	
+			for (ExecutionAttemptID executionID : executionIDs) {
+				executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
+			}
+	
+			assertEquals(JobStatus.RUNNING, executionGraph.getState());
+	
+			assertTrue(firstRestartingTimestamp != 0);
+	
+			previousRestartingTime = restartingTime.getValue();
+	
+			// check that the restarting time does not increase after we've reached the running state
+			for (int i = 0; i < 10; i++) {
+				long currentRestartingTime = restartingTime.getValue();
+	
+				assertTrue(currentRestartingTime == previousRestartingTime);
+				previousRestartingTime = currentRestartingTime;
+			}
+	
+			// fail job again
+			for (ExecutionAttemptID executionID : executionIDs) {
+				executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
+			}
+	
+			assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+	
+			long secondRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
+	
+			assertTrue(firstRestartingTimestamp != secondRestartingTimestamp);
+	
+			Thread.sleep(50);
+	
+			previousRestartingTime = restartingTime.getValue();
+	
+			// check that the restarting time is increasing again
+			for (int i = 0; i < 10; i++) {
+				long currentRestartingTime = restartingTime.getValue();
+	
+				assertTrue(currentRestartingTime >= previousRestartingTime);
+				previousRestartingTime = currentRestartingTime;
+			}
+	
+			assertTrue(previousRestartingTime > 0);
+	
+			// now lets fail the job while it is in restarting and see whether the restarting time then stops to increase
+			executionGraph.fail(new Exception());
+	
+			assertEquals(JobStatus.FAILED, executionGraph.getState());
+	
+			previousRestartingTime = restartingTime.getValue();
+	
+			for (int i = 0; i < 10; i++) {
+				long currentRestartingTime = restartingTime.getValue();
+	
+				assertTrue(currentRestartingTime == previousRestartingTime);
+				previousRestartingTime = currentRestartingTime;
+			}
+		} finally {
+			executor.shutdownNow();
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index cddb6cb..df47c3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -113,7 +113,7 @@ public class ExecutionGraphTestUtils {
 		InetAddress address = InetAddress.getByName("127.0.0.1");
 		TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
 
-		return new Instance(gateway, connection, resourceID, new InstanceID(), hardwareDescription, numberOfSlots);
+		return new Instance(gateway, connection, new InstanceID(), hardwareDescription, numberOfSlots);
 	}
 
 	@SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index a71faf6..870ae05 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -84,8 +84,7 @@ public class TerminalStateDeadlockTest {
 			TaskManagerLocation ci = new TaskManagerLocation(resourceId, address, 12345);
 				
 			HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000);
-			Instance instance = new Instance(DummyActorGateway.INSTANCE, ci,
-				resourceId, new InstanceID(), resources, 4);
+			Instance instance = new Instance(DummyActorGateway.INSTANCE, ci, new InstanceID(), resources, 4);
 
 			this.resource = instance.allocateSimpleSlot(new JobID());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index f1ed960..f3747c8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -87,12 +87,9 @@ public class InstanceManagerTest{
 			final JavaTestKit probe2 = new JavaTestKit(system);
 			final JavaTestKit probe3 = new JavaTestKit(system);
 
-			cm.registerTaskManager(probe1.getRef(), resID1,
-				ici1, hardwareDescription, 1, leaderSessionID);
-			cm.registerTaskManager(probe2.getRef(), resID2,
-				ici2, hardwareDescription, 2, leaderSessionID);
-			cm.registerTaskManager(probe3.getRef(), resID3,
-				ici3, hardwareDescription, 5, leaderSessionID);
+			cm.registerTaskManager(probe1.getRef(), ici1, hardwareDescription, 1, leaderSessionID);
+			cm.registerTaskManager(probe2.getRef(), ici2, hardwareDescription, 2, leaderSessionID);
+			cm.registerTaskManager(probe3.getRef(), ici3, hardwareDescription, 5, leaderSessionID);
 
 			assertEquals(3, cm.getNumberOfRegisteredTaskManagers());
 			assertEquals(8, cm.getTotalNumberOfSlots());
@@ -102,7 +99,7 @@ public class InstanceManagerTest{
 					HashSet<TaskManagerLocation>();
 
 			for(Instance instance: instances){
-				taskManagerLocations.add(instance.getInstanceConnectionInfo());
+				taskManagerLocations.add(instance.getTaskManagerLocation());
 			}
 
 			assertTrue(taskManagerLocations.contains(ici1));
@@ -133,14 +130,13 @@ public class InstanceManagerTest{
 			TaskManagerLocation ici = new TaskManagerLocation(resID1, address, dataPort);
 
 			JavaTestKit probe = new JavaTestKit(system);
-			cm.registerTaskManager(probe.getRef(), resID1,
-				ici, resources, 1, leaderSessionID);
+			cm.registerTaskManager(probe.getRef(), ici, resources, 1, leaderSessionID);
 
 			assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
 			assertEquals(1, cm.getTotalNumberOfSlots());
 
 			try {
-				cm.registerTaskManager(probe.getRef(), resID2, ici, resources, 1, leaderSessionID);
+				cm.registerTaskManager(probe.getRef(), ici, resources, 1, leaderSessionID);
 			} catch (Exception e) {
 				// good
 			}
@@ -182,12 +178,12 @@ public class InstanceManagerTest{
 			JavaTestKit probe2 = new JavaTestKit(system);
 			JavaTestKit probe3 = new JavaTestKit(system);
 
-			InstanceID instanceID1 = cm.registerTaskManager(probe1.getRef(), resID1,
-				ici1, hardwareDescription, 1, leaderSessionID);
-			InstanceID instanceID2 = cm.registerTaskManager(probe2.getRef(), resID2,
-				ici2, hardwareDescription, 1, leaderSessionID);
-			InstanceID instanceID3 = cm.registerTaskManager(probe3.getRef(), resID3,
-				ici3, hardwareDescription, 1, leaderSessionID);
+			InstanceID instanceID1 = cm.registerTaskManager(
+					probe1.getRef(), ici1, hardwareDescription, 1, leaderSessionID);
+			InstanceID instanceID2 = cm.registerTaskManager(
+					probe2.getRef(), ici2, hardwareDescription, 1, leaderSessionID);
+			InstanceID instanceID3 = cm.registerTaskManager(
+					probe3.getRef(), ici3, hardwareDescription, 1, leaderSessionID);
 
 			// report some immediate heart beats
 			assertTrue(cm.reportHeartBeat(instanceID1, new byte[] {}));
@@ -241,8 +237,7 @@ public class InstanceManagerTest{
 				TaskManagerLocation ici = new TaskManagerLocation(resID, address, 20000);
 
 				JavaTestKit probe = new JavaTestKit(system);
-				cm.registerTaskManager(probe.getRef(), resID,
-					ici, resources, 1, leaderSessionID);
+				cm.registerTaskManager(probe.getRef(), ici, resources, 1, leaderSessionID);
 				fail("Should raise exception in shutdown state");
 			}
 			catch (IllegalStateException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 82d3723..aee62fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -42,7 +42,7 @@ public class InstanceTest {
 			TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
 
 			Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
-					resourceID, new InstanceID(), hardwareDescription, 4);
+					new InstanceID(), hardwareDescription, 4);
 
 			assertEquals(4, instance.getTotalNumberOfSlots());
 			assertEquals(4, instance.getNumberOfAvailableSlots());
@@ -105,7 +105,7 @@ public class InstanceTest {
 			TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
 
 			Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
-					resourceID, new InstanceID(), hardwareDescription, 3);
+					new InstanceID(), hardwareDescription, 3);
 
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 
@@ -137,7 +137,7 @@ public class InstanceTest {
 			TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
 
 			Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
-					resourceID, new InstanceID(), hardwareDescription, 3);
+					new InstanceID(), hardwareDescription, 3);
 
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
index 2c40e89..0edef5e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
@@ -132,7 +132,7 @@ public class SharedSlotsTest {
 			assertEquals(Locality.LOCAL, sub1.getLocality());
 			assertEquals(1, sub1.getNumberLeaves());
 			assertEquals(vid1, sub1.getGroupID());
-			assertEquals(instance.getResourceId(), sub1.getTaskManagerID());
+			assertEquals(instance.getTaskManagerID(), sub1.getTaskManagerID());
 			assertEquals(jobId, sub1.getJobID());
 			assertEquals(sharedSlot, sub1.getParent());
 			assertEquals(sharedSlot, sub1.getRoot());
@@ -151,7 +151,7 @@ public class SharedSlotsTest {
 			assertEquals(Locality.UNCONSTRAINED, sub2.getLocality());
 			assertEquals(1, sub2.getNumberLeaves());
 			assertEquals(vid2, sub2.getGroupID());
-			assertEquals(instance.getResourceId(), sub2.getTaskManagerID());
+			assertEquals(instance.getTaskManagerID(), sub2.getTaskManagerID());
 			assertEquals(jobId, sub2.getJobID());
 			assertEquals(sharedSlot, sub2.getParent());
 			assertEquals(sharedSlot, sub2.getRoot());
@@ -163,14 +163,14 @@ public class SharedSlotsTest {
 			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid3));
 			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid4));
 			
-			SimpleSlot sub3 = assignment.getSlotForTask(vid3, Collections.singleton(instance.getInstanceConnectionInfo()));
+			SimpleSlot sub3 = assignment.getSlotForTask(vid3, Collections.singleton(instance.getTaskManagerLocation()));
 			assertNotNull(sub3);
 			
 			assertNull(sub3.getExecutedVertex());
 			assertEquals(Locality.LOCAL, sub3.getLocality());
 			assertEquals(1, sub3.getNumberLeaves());
 			assertEquals(vid3, sub3.getGroupID());
-			assertEquals(instance.getResourceId(), sub3.getTaskManagerID());
+			assertEquals(instance.getTaskManagerID(), sub3.getTaskManagerID());
 			assertEquals(jobId, sub3.getJobID());
 			assertEquals(sharedSlot, sub3.getParent());
 			assertEquals(sharedSlot, sub3.getRoot());
@@ -183,14 +183,14 @@ public class SharedSlotsTest {
 			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid4));
 
 			SimpleSlot sub4 = assignment.getSlotForTask(vid4,
-					Collections.singleton(SchedulerTestUtils.getRandomInstance(1).getInstanceConnectionInfo()));
+					Collections.singleton(SchedulerTestUtils.getRandomInstance(1).getTaskManagerLocation()));
 			assertNotNull(sub4);
 			
 			assertNull(sub4.getExecutedVertex());
 			assertEquals(Locality.NON_LOCAL, sub4.getLocality());
 			assertEquals(1, sub4.getNumberLeaves());
 			assertEquals(vid4, sub4.getGroupID());
-			assertEquals(instance.getResourceId(), sub4.getTaskManagerID());
+			assertEquals(instance.getTaskManagerID(), sub4.getTaskManagerID());
 			assertEquals(jobId, sub4.getJobID());
 			assertEquals(sharedSlot, sub4.getParent());
 			assertEquals(sharedSlot, sub4.getRoot());
@@ -456,7 +456,7 @@ public class SharedSlotsTest {
 			assertNotNull(constraint.getSharedSlot());
 			assertTrue(constraint.isAssigned());
 			assertTrue(constraint.isAssignedAndAlive());
-			assertEquals(instance.getInstanceConnectionInfo(), constraint.getLocation());
+			assertEquals(instance.getTaskManagerLocation(), constraint.getLocation());
 			
 			SimpleSlot tailSlot = assignment.getSlotForTask(constraint, Collections.<TaskManagerLocation>emptySet());
 			
@@ -475,7 +475,7 @@ public class SharedSlotsTest {
 			assertTrue(tailSlot.isReleased());
 			assertTrue(constraint.isAssigned());
 			assertFalse(constraint.isAssignedAndAlive());
-			assertEquals(instance.getInstanceConnectionInfo(), constraint.getLocation());
+			assertEquals(instance.getTaskManagerLocation(), constraint.getLocation());
 			
 			// we should have resources again for the co-location constraint
 			assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(constraint.getGroupId()));
@@ -488,10 +488,10 @@ public class SharedSlotsTest {
 			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(constraint.getGroupId()));
 			
 			// verify some basic properties of the slots
-			assertEquals(instance.getResourceId(), sourceSlot.getTaskManagerID());
-			assertEquals(instance.getResourceId(), headSlot.getTaskManagerID());
-			assertEquals(instance.getResourceId(), tailSlot.getTaskManagerID());
-			assertEquals(instance.getResourceId(), sinkSlot.getTaskManagerID());
+			assertEquals(instance.getTaskManagerID(), sourceSlot.getTaskManagerID());
+			assertEquals(instance.getTaskManagerID(), headSlot.getTaskManagerID());
+			assertEquals(instance.getTaskManagerID(), tailSlot.getTaskManagerID());
+			assertEquals(instance.getTaskManagerID(), sinkSlot.getTaskManagerID());
 
 			assertEquals(sourceId, sourceSlot.getGroupID());
 			assertEquals(sinkId, sinkSlot.getGroupID());

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index 82c2a74..c690d36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -150,7 +150,7 @@ public class SimpleSlotTest {
 		TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
 
 		Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
-				resourceID, new InstanceID(), hardwareDescription, 1);
+				new InstanceID(), hardwareDescription, 1);
 		return instance.allocateSimpleSlot(new JobID());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
index fff7bc6..5722cac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
@@ -36,6 +35,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -121,7 +121,7 @@ public class SpilledSubpartitionViewTest {
 				}
 			}
 
-			final List<Future<Boolean>> results = Lists.newArrayList();
+			final List<Future<Boolean>> results = new ArrayList<>();
 
 			// Submit the consuming tasks
 			for (ResultSubpartitionView view : readers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
index 3bd4368..1344aef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
@@ -140,7 +140,7 @@ public class CoLocationConstraintTest {
 			// now, the location is assigned and we have a location
 			assertTrue(constraint.isAssigned());
 			assertTrue(constraint.isAssignedAndAlive());
-			assertEquals(instance2, constraint.getLocation());
+			assertEquals(instance2.getTaskManagerLocation(), constraint.getLocation());
 			
 			// release the slot
 			slot2_1.releaseSlot();
@@ -148,7 +148,7 @@ public class CoLocationConstraintTest {
 			// we should still have a location
 			assertTrue(constraint.isAssigned());
 			assertFalse(constraint.isAssignedAndAlive());
-			assertEquals(instance2, constraint.getLocation());
+			assertEquals(instance2.getTaskManagerLocation(), constraint.getLocation());
 
 			// we can not assign a different location
 			try {
@@ -167,7 +167,7 @@ public class CoLocationConstraintTest {
 
 			assertTrue(constraint.isAssigned());
 			assertTrue(constraint.isAssignedAndAlive());
-			assertEquals(instance2, constraint.getLocation());
+			assertEquals(instance2.getTaskManagerLocation(), constraint.getLocation());
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index 5b7d18a..eab4fea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -326,8 +326,8 @@ public class ScheduleWithCoLocationHintTest {
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);
 
-			TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
-			TaskManagerLocation loc2 = i2.getInstanceConnectionInfo();
+			TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+			TaskManagerLocation loc2 = i2.getTaskManagerLocation();
 
 			scheduler.newInstanceAvailable(i2);
 			scheduler.newInstanceAvailable(i1);
@@ -398,8 +398,8 @@ public class ScheduleWithCoLocationHintTest {
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);
 
-			TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
-			TaskManagerLocation loc2 = i2.getInstanceConnectionInfo();
+			TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+			TaskManagerLocation loc2 = i2.getTaskManagerLocation();
 
 			scheduler.newInstanceAvailable(i2);
 			scheduler.newInstanceAvailable(i1);
@@ -425,8 +425,8 @@ public class ScheduleWithCoLocationHintTest {
 			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2));
 			
 			// still preserves the previous instance mapping)
-			assertEquals(i1.getResourceId(), s3.getTaskManagerID());
-			assertEquals(i2.getResourceId(), s4.getTaskManagerID());
+			assertEquals(i1.getTaskManagerID(), s3.getTaskManagerID());
+			assertEquals(i2.getTaskManagerID(), s4.getTaskManagerID());
 			
 			s3.releaseSlot();
 			s4.releaseSlot();
@@ -455,8 +455,8 @@ public class ScheduleWithCoLocationHintTest {
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);
 
-			TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
-			TaskManagerLocation loc2 = i2.getInstanceConnectionInfo();
+			TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+			TaskManagerLocation loc2 = i2.getTaskManagerLocation();
 
 			scheduler.newInstanceAvailable(i2);
 			scheduler.newInstanceAvailable(i1);
@@ -516,7 +516,7 @@ public class ScheduleWithCoLocationHintTest {
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);
 
-			TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
+			TaskManagerLocation loc1 = i1.getTaskManagerLocation();
 
 			scheduler.newInstanceAvailable(i2);
 			scheduler.newInstanceAvailable(i1);
@@ -580,8 +580,8 @@ public class ScheduleWithCoLocationHintTest {
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);
 
-			TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
-			TaskManagerLocation loc2 = i2.getInstanceConnectionInfo();
+			TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+			TaskManagerLocation loc2 = i2.getTaskManagerLocation();
 
 			scheduler.newInstanceAvailable(i2);
 			scheduler.newInstanceAvailable(i1);

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index a683834..fd0523b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -108,10 +108,10 @@ public class SchedulerSlotSharingTest {
 			
 			// make sure we have two slots on the first instance, and two on the second
 			int c = 0;
-			c += (s5.getTaskManagerID().equals(i1.getResourceId())) ? 1 : -1;
-			c += (s6.getTaskManagerID().equals(i1.getResourceId())) ? 1 : -1;
-			c += (s7.getTaskManagerID().equals(i1.getResourceId())) ? 1 : -1;
-			c += (s8.getTaskManagerID().equals(i1.getResourceId())) ? 1 : -1;
+			c += (s5.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1;
+			c += (s6.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1;
+			c += (s7.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1;
+			c += (s8.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1;
 			assertEquals(0, c);
 			
 			// release all
@@ -637,8 +637,8 @@ public class SchedulerSlotSharingTest {
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);
 
-			TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
-			TaskManagerLocation loc2 = i2.getInstanceConnectionInfo();
+			TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+			TaskManagerLocation loc2 = i2.getTaskManagerLocation();
 
 			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			scheduler.newInstanceAvailable(i1);
@@ -690,8 +690,8 @@ public class SchedulerSlotSharingTest {
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);
 
-			TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
-			TaskManagerLocation loc2 = i2.getInstanceConnectionInfo();
+			TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+			TaskManagerLocation loc2 = i2.getTaskManagerLocation();
 
 			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			scheduler.newInstanceAvailable(i1);
@@ -743,7 +743,7 @@ public class SchedulerSlotSharingTest {
 			Instance i1 = getRandomInstance(2);
 			Instance i2 = getRandomInstance(2);
 
-			TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
+			TaskManagerLocation loc1 = i1.getTaskManagerLocation();
 
 			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 			scheduler.newInstanceAvailable(i1);
@@ -771,12 +771,12 @@ public class SchedulerSlotSharingTest {
 			assertEquals(0, i1.getNumberOfAvailableSlots());
 			assertEquals(0, i2.getNumberOfAvailableSlots());
 			
-			assertEquals(i1.getResourceId(), s1.getTaskManagerID());
-			assertEquals(i1.getResourceId(), s2.getTaskManagerID());
-			assertEquals(i1.getResourceId(), s3.getTaskManagerID());
-			assertEquals(i1.getResourceId(), s4.getTaskManagerID());
-			assertEquals(i2.getResourceId(), s5.getTaskManagerID());
-			assertEquals(i2.getResourceId(), s6.getTaskManagerID());
+			assertEquals(i1.getTaskManagerID(), s1.getTaskManagerID());
+			assertEquals(i1.getTaskManagerID(), s2.getTaskManagerID());
+			assertEquals(i1.getTaskManagerID(), s3.getTaskManagerID());
+			assertEquals(i1.getTaskManagerID(), s4.getTaskManagerID());
+			assertEquals(i2.getTaskManagerID(), s5.getTaskManagerID());
+			assertEquals(i2.getTaskManagerID(), s6.getTaskManagerID());
 			
 			// check the scheduler's bookkeeping
 			assertEquals(4, scheduler.getNumberOfLocalizedAssignments());

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index eef27a8..d040ec4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -69,8 +69,7 @@ public class SchedulerTestUtils {
 		final long GB = 1024L*1024*1024;
 		HardwareDescription resources = new HardwareDescription(4, 4*GB, 3*GB, 2*GB);
 		
-		return new Instance(DummyActorGateway.INSTANCE, ci, resourceID,
-			new InstanceID(), resources, numSlots);
+		return new Instance(DummyActorGateway.INSTANCE, ci, new InstanceID(), resources, numSlots);
 	}
 	
 	
@@ -88,7 +87,7 @@ public class SchedulerTestUtils {
 	public static Execution getTestVertex(Instance... preferredInstances) {
 		List<TaskManagerLocation> locations = new ArrayList<>(preferredInstances.length);
 		for (Instance i : preferredInstances) {
-			locations.add(i.getInstanceConnectionInfo());
+			locations.add(i.getTaskManagerLocation());
 		}
 		return getTestVertex(locations);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
index d9c100c..ea0d2cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
@@ -56,9 +56,9 @@ public class SlotAllocationFutureTest {
 			final Instance instance2 = SchedulerTestUtils.getRandomInstance(1);
 
 			final SimpleSlot slot1 = new SimpleSlot(new JobID(), instance1,
-					instance1.getInstanceConnectionInfo(), 0, instance1.getActorGateway(), null, null);
+					instance1.getTaskManagerLocation(), 0, instance1.getActorGateway(), null, null);
 			final SimpleSlot slot2 = new SimpleSlot(new JobID(), instance2,
-					instance2.getInstanceConnectionInfo(), 0, instance2.getActorGateway(), null, null);
+					instance2.getTaskManagerLocation(), 0, instance2.getActorGateway(), null, null);
 			
 			future.setSlot(slot1);
 			try {
@@ -85,7 +85,7 @@ public class SlotAllocationFutureTest {
 				final Instance instance = SchedulerTestUtils.getRandomInstance(1);
 
 				final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance,
-						instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, null);
+						instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
 				
 				SlotAllocationFuture future = new SlotAllocationFuture();
 				
@@ -108,7 +108,7 @@ public class SlotAllocationFutureTest {
 				final Instance instance = SchedulerTestUtils.getRandomInstance(1);
 				
 				final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance,
-						instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, null);
+						instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
 				
 				SlotAllocationFuture future = new SlotAllocationFuture();
 				future.setSlot(thisSlot);
@@ -141,7 +141,7 @@ public class SlotAllocationFutureTest {
 				final Instance instance = SchedulerTestUtils.getRandomInstance(1);
 
 				final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance,
-						instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, null);
+						instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
 				
 				final SlotAllocationFuture future = new SlotAllocationFuture();
 				
@@ -181,7 +181,7 @@ public class SlotAllocationFutureTest {
 				final Instance instance = SchedulerTestUtils.getRandomInstance(1);
 
 				final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance, 
-						instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, null);
+						instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
 				final SlotAllocationFuture future = new SlotAllocationFuture();
 
 				future.setSlot(thisSlot);