You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/02 15:36:12 UTC

[8/8] flink git commit: [FLINK-4490] [distributed coordination] (part 1) Change InstanceConnectionInfo to TaskManagerLocation

[FLINK-4490] [distributed coordination] (part 1) Change InstanceConnectionInfo to TaskManagerLocation

This adds the ResourceId to the TaskManagerLocation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34cda87a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34cda87a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34cda87a

Branch: refs/heads/master
Commit: 34cda87a6e831c6ea62f1a79df08d1efac718d03
Parents: e227b10
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 29 16:58:31 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 2 17:32:57 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/MesosTaskManager.scala     |   7 +-
 .../handlers/JobExceptionsHandler.java          |   4 +-
 .../handlers/JobVertexDetailsHandler.java       |   4 +-
 .../handlers/JobVertexTaskManagersHandler.java  |   4 +-
 .../SubtaskExecutionAttemptDetailsHandler.java  |   4 +-
 .../SubtasksAllAccumulatorsHandler.java         |   4 +-
 .../handlers/SubtasksTimesHandler.java          |   4 +-
 ...PartialInputChannelDeploymentDescriptor.java |  10 +-
 .../flink/runtime/executiongraph/Execution.java |   8 +-
 .../runtime/executiongraph/ExecutionVertex.java |   4 +-
 .../apache/flink/runtime/instance/Instance.java |   7 +-
 .../instance/InstanceConnectionInfo.java        | 267 -------------------
 .../flink/runtime/instance/InstanceManager.java |   3 +-
 .../flink/runtime/io/network/ConnectionID.java  |   4 +-
 .../taskmanager/TaskManagerLocation.java        | 257 ++++++++++++++++++
 .../runtime/messages/RegistrationMessages.scala |  12 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  15 +-
 .../testingUtils/TestingTaskManager.scala       |  33 ++-
 .../ExecutionGraphMetricsTest.java              |   8 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   7 +-
 .../TerminalStateDeadlockTest.java              |   7 +-
 .../VertexLocationConstraintTest.java           |   6 +-
 .../instance/InstanceConnectionInfoTest.java    | 200 --------------
 .../runtime/instance/InstanceManagerTest.java   |  64 +++--
 .../flink/runtime/instance/InstanceTest.java    |  16 +-
 .../flink/runtime/instance/SimpleSlotTest.java  |   6 +-
 .../scheduler/SchedulerTestUtils.java           |  25 +-
 .../resourcemanager/ResourceManagerITCase.java  |   5 +-
 ...askManagerComponentsStartupShutdownTest.java |  12 +-
 .../taskmanager/TaskManagerLocationTest.java    | 201 ++++++++++++++
 .../jobmanager/JobManagerRegistrationTest.scala |  14 +-
 .../flink/yarn/TestingYarnTaskManager.scala     |  20 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |   7 +-
 33 files changed, 618 insertions(+), 631 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
index d8b6775..19b0c62 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
@@ -19,12 +19,11 @@
 package org.apache.flink.mesos.runtime.clusterframework
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration}
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
 
 /** An extension of the TaskManager that listens for additional Mesos-related
   * messages.
@@ -32,7 +31,7 @@ import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfigurati
 class MesosTaskManager(
     config: TaskManagerConfiguration,
     resourceID: ResourceID,
-    connectionInfo: InstanceConnectionInfo,
+    taskManagerLocation: TaskManagerLocation,
     memoryManager: MemoryManager,
     ioManager: IOManager,
     network: NetworkEnvironment,
@@ -41,7 +40,7 @@ class MesosTaskManager(
   extends TaskManager(
     config,
     resourceID,
-    connectionInfo,
+    taskManagerLocation,
     memoryManager,
     ioManager,
     network,

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index 7b6a361..ce154e3 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -66,7 +66,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 					break;
 				}
 
-				InstanceConnectionInfo location = task.getCurrentAssignedResourceLocation();
+				TaskManagerLocation location = task.getCurrentAssignedResourceLocation();
 				String locationString = location != null ?
 						location.getFQDNHostname() + ':' + location.dataPort() : "(unassigned)";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index d4e885e..813ecb8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.io.StringWriter;
@@ -61,7 +61,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
 			final ExecutionState status = vertex.getExecutionState();
 			
-			InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+			TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
 			String locationString = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
 
 			long startTime = vertex.getStateTimestamp(ExecutionState.DEPLOYING);

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index befc0bf..cbdb87f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.io.StringWriter;
@@ -51,7 +51,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 		Map<String, List<ExecutionVertex>> taskManagerVertices = new HashMap<>();
 
 		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
-			InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+			TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
 			String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
 
 			List<ExecutionVertex> vertices = taskManagerVertices.get(taskManager);

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index 3d80b23..a1e6d0e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.io.StringWriter;
@@ -45,7 +45,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 		final ExecutionState status = execAttempt.getState();
 		final long now = System.currentTimeMillis();
 
-		InstanceConnectionInfo location = execAttempt.getAssignedResourceLocation();
+		TaskManagerLocation location = execAttempt.getAssignedResourceLocation();
 		String locationString = location == null ? "(unassigned)" : location.getHostname();
 
 		long startTime = execAttempt.getStateTimestamp(ExecutionState.DEPLOYING);

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 6d9ce3a..780bd4b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.io.StringWriter;
@@ -52,7 +52,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
 		int num = 0;
 		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
 
-			InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+			TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
 			String locationString = location == null ? "(unassigned)" : location.getHostname();
 			
 			gen.writeStartObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index 03d40dc..9e6276d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.io.StringWriter;
@@ -70,7 +70,7 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
 			gen.writeStartObject();
 			gen.writeNumberField("subtask", num++);
 
-			InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+			TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
 			String locationString = location == null ? "(unassigned)" : location.getHostname();
 			gen.writeStringField("host", locationString);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
index a3cfcd9..e1391a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.deployment;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -44,7 +44,7 @@ public class PartialInputChannelDeploymentDescriptor {
 	private final ResultPartitionID partitionID;
 
 	/** The partition connection info. */
-	private final InstanceConnectionInfo partitionConnectionInfo;
+	private final TaskManagerLocation partitionConnectionInfo;
 
 	/** The partition connection index. */
 	private final int partitionConnectionIndex;
@@ -52,7 +52,7 @@ public class PartialInputChannelDeploymentDescriptor {
 	public PartialInputChannelDeploymentDescriptor(
 			IntermediateDataSetID resultId,
 			ResultPartitionID partitionID,
-			InstanceConnectionInfo partitionConnectionInfo,
+			TaskManagerLocation partitionConnectionInfo,
 			int partitionConnectionIndex) {
 
 		this.resultId = checkNotNull(resultId);
@@ -71,7 +71,7 @@ public class PartialInputChannelDeploymentDescriptor {
 
 		checkNotNull(consumerExecution, "Consumer execution null");
 
-		InstanceConnectionInfo consumerConnectionInfo = consumerExecution.getAssignedResourceLocation();
+		TaskManagerLocation consumerConnectionInfo = consumerExecution.getAssignedResourceLocation();
 
 		checkNotNull(consumerConnectionInfo, "Consumer connection info null");
 
@@ -107,7 +107,7 @@ public class PartialInputChannelDeploymentDescriptor {
 		final IntermediateResult result = partition.getIntermediateResult();
 
 		final IntermediateDataSetID resultId = result.getId();
-		final InstanceConnectionInfo partitionConnectionInfo = producer.getAssignedResourceLocation();
+		final TaskManagerLocation partitionConnectionInfo = producer.getAssignedResourceLocation();
 		final int partitionConnectionIndex = result.getConnectionIndex();
 
 		return new PartialInputChannelDeploymentDescriptor(

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index efddecc..197999c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.ConnectionID;
@@ -133,7 +133,7 @@ public class Execution {
 
 	private volatile Throwable failureCause;          // once assigned, never changes
 
-	private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
+	private volatile TaskManagerLocation assignedResourceLocation; // for the archived execution
 
 	private ChainedStateHandle<StreamStateHandle> chainedStateHandle;
 
@@ -147,7 +147,7 @@ public class Execution {
 	
 	/* Lock for updating the accumulators atomically. Prevents final accumulators to be overwritten
 	* by partial accumulators on a late heartbeat*/
-	private final SerializableObject accumulatorLock = new SerializableObject();
+	private final Object accumulatorLock = new Object();
 
 	/* Continuously updated map of user-defined accumulators */
 	private volatile Map<String, Accumulator<?, ?>> userAccumulators;
@@ -202,7 +202,7 @@ public class Execution {
 		return assignedResource;
 	}
 
-	public InstanceConnectionInfo getAssignedResourceLocation() {
+	public TaskManagerLocation getAssignedResourceLocation() {
 		return assignedResourceLocation;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index b215394..e5a115a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -228,7 +228,7 @@ public class ExecutionVertex {
 		return currentExecution.getAssignedResource();
 	}
 
-	public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
+	public TaskManagerLocation getCurrentAssignedResourceLocation() {
 		return currentExecution.getAssignedResourceLocation();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 119f060..598b32b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +47,7 @@ public class Instance {
 	private final ActorGateway actorGateway;
 
 	/** The instance connection information for the data transfer. */
-	private final InstanceConnectionInfo connectionInfo;
+	private final TaskManagerLocation connectionInfo;
 
 	/** A description of the resources of the task manager */
 	private final HardwareDescription resources;
@@ -92,7 +93,7 @@ public class Instance {
 	 */
 	public Instance(
 			ActorGateway actorGateway,
-			InstanceConnectionInfo connectionInfo,
+			TaskManagerLocation connectionInfo,
 			ResourceID resourceId,
 			InstanceID id,
 			HardwareDescription resources,
@@ -350,7 +351,7 @@ public class Instance {
 		return actorGateway;
 	}
 
-	public InstanceConnectionInfo getInstanceConnectionInfo() {
+	public TaskManagerLocation getInstanceConnectionInfo() {
 		return connectionInfo;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
deleted file mode 100644
index 2830f04..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class encapsulates the connection information of a TaskManager.
- * It describes the host where the TaskManager operates and its server port
- * for data exchange. This class also contains utilities to work with the
- * TaskManager's host name, which is used to localize work assignments.
- */
-public class InstanceConnectionInfo implements IOReadableWritable, Comparable<InstanceConnectionInfo>, Serializable {
-
-	private static final long serialVersionUID = -8254407801276350716L;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(InstanceConnectionInfo.class);
-	
-
-	/**
-	 * The network address the instance's task manager binds its sockets to.
-	 */
-	private InetAddress inetAddress;
-
-	/**
-	 * The port the instance's task manager expects to receive transfer envelopes on.
-	 */
-	private int dataPort;
-
-	/**
-	 * The fully qualified host name of the instance.
-	 */
-	private String fqdnHostName;
-	
-	/**
-	 * The hostname, derived from the fully qualified host name.
-	 */
-	private String hostName;
-
-
-	/**
-	 * Constructs a new instance connection info object. The constructor will attempt to retrieve the instance's
-	 * host name and domain name through the operating system's lookup mechanisms.
-	 * 
-	 * @param inetAddress
-	 *        the network address the instance's task manager binds its sockets to
-	 * @param dataPort
-	 *        the port instance's task manager expects to receive transfer envelopes on
-	 */
-	public InstanceConnectionInfo(InetAddress inetAddress, int dataPort) {
-		if (inetAddress == null) {
-			throw new IllegalArgumentException("Argument inetAddress must not be null");
-		}
-
-		// -1 indicates a local instance connection info
-		if (dataPort != -1 && dataPort <= 0) {
-			throw new IllegalArgumentException("Argument dataPort must be greater than zero");
-		}
-
-		this.dataPort = dataPort;
-		this.inetAddress = inetAddress;
-		
-		// get FQDN hostname on this TaskManager.
-		try {
-			this.fqdnHostName = this.inetAddress.getCanonicalHostName();
-		}
-		catch (Throwable t) {
-			LOG.warn("Unable to determine the canonical hostname. Input split assignment (such as " +
-					"for HDFS files) may be non-local when the canonical hostname is missing.");
-			LOG.debug("getCanonicalHostName() Exception:", t);
-			this.fqdnHostName = this.inetAddress.getHostAddress();
-		}
-
-		if (this.fqdnHostName.equals(this.inetAddress.getHostAddress())) {
-			// this happens when the name lookup fails, either due to an exception,
-			// or because no hostname can be found for the address
-			// take IP textual representation
-			this.hostName = this.fqdnHostName;
-			LOG.warn("No hostname could be resolved for the IP address {}, using IP address as host name. "
-					+ "Local input split assignment (such as for HDFS files) may be impacted.",
-					this.inetAddress.getHostAddress());
-		}
-		else {
-			this.hostName = NetUtils.getHostnameFromFQDN(this.fqdnHostName);
-		}
-	}
-
-	/**
-	 * Constructs an empty object.
-	 */
-	public InstanceConnectionInfo() {}
-
-
-	/**
-	 * Returns the port instance's task manager expects to receive transfer envelopes on.
-	 * 
-	 * @return the port instance's task manager expects to receive transfer envelopes on
-	 */
-	public int dataPort() {
-		return this.dataPort;
-	}
-
-	/**
-	 * Returns the network address the instance's task manager binds its sockets to.
-	 * 
-	 * @return the network address the instance's task manager binds its sockets to
-	 */
-	public InetAddress address() {
-		return this.inetAddress;
-	}
-
-	/**
-	 * Returns the fully-qualified domain name the TaskManager. If the name could not be
-	 * determined, the return value will be a textual representation of the TaskManager's IP address.
-	 * 
-	 * @return The fully-qualified domain name of the TaskManager.
-	 */
-	public String getFQDNHostname() {
-		return this.fqdnHostName;
-	}
-
-	/**
-	 * Gets the hostname of the TaskManager. The hostname derives from the fully qualified
-	 * domain name (FQDN, see {@link #getFQDNHostname()}):
-	 * <ul>
-	 *     <li>If the FQDN is the textual IP address, then the hostname is also the IP address</li>
-	 *     <li>If the FQDN has only one segment (such as "localhost", or "host17"), then this is
-	 *         used as the hostname.</li>
-	 *     <li>If the FQDN has multiple segments (such as "worker3.subgroup.company.net"), then the first
-	 *         segment (here "worker3") will be used as the hostname.</li>
-	 * </ul>
-	 *
-	 * @return The hostname of the TaskManager.
-	 */
-	public String getHostname() {
-		return hostName;
-	}
-
-	/**
-	 * Gets the IP address where the TaskManager operates.
-	 *
-	 * @return The IP address.
-	 */
-	public String getInetAdress() {
-		return this.inetAddress.toString();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Serialization
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-
-		final int addr_length = in.readInt();
-		byte[] address = new byte[addr_length];
-		in.readFully(address);
-		
-		this.dataPort = in.readInt();
-		
-		this.fqdnHostName = StringUtils.readNullableString(in);
-		this.hostName = StringUtils.readNullableString(in);
-
-		try {
-			this.inetAddress = InetAddress.getByAddress(address);
-		} catch (UnknownHostException e) {
-			throw new IOException("This lookup should never fail.", e);
-		}
-	}
-
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-		out.writeInt(this.inetAddress.getAddress().length);
-		out.write(this.inetAddress.getAddress());
-		
-		out.writeInt(this.dataPort);
-		
-		StringUtils.writeNullableString(fqdnHostName, out);
-		StringUtils.writeNullableString(hostName, out);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Utilities
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public String toString() {
-		return getFQDNHostname() + " (dataPort=" + dataPort + ")";
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof InstanceConnectionInfo) {
-			InstanceConnectionInfo other = (InstanceConnectionInfo) obj;
-			return this.dataPort == other.dataPort &&
-					this.inetAddress.equals(other.inetAddress);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		return this.inetAddress.hashCode() +
-				17*dataPort;
-	}
-
-	@Override
-	public int compareTo(InstanceConnectionInfo o) {
-		// decide based on address first
-		byte[] thisAddress = this.inetAddress.getAddress();
-		byte[] otherAddress = o.inetAddress.getAddress();
-		
-		if (thisAddress.length < otherAddress.length) {
-			return -1;
-		} else if (thisAddress.length > otherAddress.length) {
-			return 1;
-		} else {
-			for (int i = 0; i < thisAddress.length; i++) {
-				byte tb = thisAddress[i];
-				byte ob = otherAddress[i];
-				if (tb < ob) {
-					return -1;
-				} else if (tb > ob) {
-					return 1;
-				}
-			}
-		}
-		
-		// addresses are identical, decide based on ports.
-		if (this.dataPort < o.dataPort) {
-			return -1;
-		} else if (this.dataPort > o.dataPort) {
-			return 1;
-		} else {
-			return 0;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 0d0d4c7..e7a4537 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 
 import akka.actor.ActorRef;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -147,7 +148,7 @@ public class InstanceManager {
 	public InstanceID registerTaskManager(
 			ActorRef taskManager,
 			ResourceID resourceID,
-			InstanceConnectionInfo connectionInfo,
+			TaskManagerLocation connectionInfo,
 			HardwareDescription resources,
 			int numberOfSlots,
 			UUID leaderSessionID){

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
index 0569dae..cc2a19d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.io.Serializable;
 import java.net.InetSocketAddress;
@@ -43,7 +43,7 @@ public class ConnectionID implements Serializable {
 
 	private final int connectionIndex;
 
-	public ConnectionID(InstanceConnectionInfo connectionInfo, int connectionIndex) {
+	public ConnectionID(TaskManagerLocation connectionInfo, int connectionIndex) {
 		this(new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()), connectionIndex);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
new file mode 100644
index 0000000..5a0faa5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import java.net.InetAddress;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.util.NetUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class encapsulates the connection information of a TaskManager.
+ * It describes the host where the TaskManager operates and its server port
+ * for data exchange. This class also contains utilities to work with the
+ * TaskManager's host name, which is used to localize work assignments.
+ */
+public class TaskManagerLocation implements Comparable<TaskManagerLocation>, java.io.Serializable {
+
+	private static final long serialVersionUID = -8254407801276350716L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerLocation.class);
+
+	// ------------------------------------------------------------------------
+
+	/** The ID of the resource in which the TaskManager is started. This can be for example
+	 * the YARN container ID, Mesos container ID, or any other unique identifier. */
+	private final ResourceID resourceID;
+
+	/** The network address that the TaskManager binds its sockets to */
+	private final InetAddress inetAddress;
+
+	/** The fully qualified host name of the TaskManager */
+	private final String fqdnHostName;
+
+	/** The pure hostname, derived from the fully qualified host name. */
+	private final String hostName;
+	
+	/** The port that the TaskManager receive data transport connection requests at */
+	private final int dataPort;
+
+	/** The toString representation, eagerly constructed and cached to avoid repeated string building */  
+	private final String stringRepresentation;
+
+	/**
+	 * Constructs a new instance connection info object. The constructor will attempt to retrieve the instance's
+	 * host name and domain name through the operating system's lookup mechanisms.
+	 * 
+	 * @param inetAddress
+	 *        the network address the instance's task manager binds its sockets to
+	 * @param dataPort
+	 *        the port instance's task manager expects to receive transfer envelopes on
+	 */
+	public TaskManagerLocation(ResourceID resourceID, InetAddress inetAddress, int dataPort) {
+		// -1 indicates a local instance connection info
+		checkArgument(dataPort > 0 || dataPort == -1, "dataPort must be > 0, or -1 (local)");
+
+		this.resourceID = checkNotNull(resourceID);
+		this.inetAddress = checkNotNull(inetAddress);
+		this.dataPort = dataPort;
+
+		// get FQDN hostname on this TaskManager.
+		String fqdnHostName;
+		try {
+			fqdnHostName = this.inetAddress.getCanonicalHostName();
+		}
+		catch (Throwable t) {
+			LOG.warn("Unable to determine the canonical hostname. Input split assignment (such as " +
+					"for HDFS files) may be non-local when the canonical hostname is missing.");
+			LOG.debug("getCanonicalHostName() Exception:", t);
+			fqdnHostName = this.inetAddress.getHostAddress();
+		}
+		this.fqdnHostName = fqdnHostName;
+
+		if (this.fqdnHostName.equals(this.inetAddress.getHostAddress())) {
+			// this happens when the name lookup fails, either due to an exception,
+			// or because no hostname can be found for the address
+			// take IP textual representation
+			this.hostName = this.fqdnHostName;
+			LOG.warn("No hostname could be resolved for the IP address {}, using IP address as host name. "
+					+ "Local input split assignment (such as for HDFS files) may be impacted.",
+					this.inetAddress.getHostAddress());
+		}
+		else {
+			this.hostName = NetUtils.getHostnameFromFQDN(this.fqdnHostName);
+		}
+
+		this.stringRepresentation = String.format(
+				"TaskManager (%s) @ %s (dataPort=%d)", resourceID, fqdnHostName, dataPort);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Getters
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the ID of the resource in which the TaskManager is started. The format of this depends
+	 * on how the TaskManager is started:
+	 * <ul>
+	 *     <li>If the TaskManager is started via YARN, this is the YARN container ID.</li>
+	 *     <li>If the TaskManager is started via Mesos, this is the Mesos container ID.</li>
+	 *     <li>If the TaskManager is started in standalone mode, or via a MiniCluster, this is a random ID.</li>
+	 *     <li>Other deployment modes can set the resource ID in other ways.</li>
+	 * </ul>
+	 * 
+	 * @return The ID of the resource in which the TaskManager is started
+	 */
+	public ResourceID getResourceID() {
+		return resourceID;
+	}
+
+	/**
+	 * Returns the port instance's task manager expects to receive transfer envelopes on.
+	 * 
+	 * @return the port instance's task manager expects to receive transfer envelopes on
+	 */
+	public int dataPort() {
+		return dataPort;
+	}
+
+	/**
+	 * Returns the network address the instance's task manager binds its sockets to.
+	 * 
+	 * @return the network address the instance's task manager binds its sockets to
+	 */
+	public InetAddress address() {
+		return inetAddress;
+	}
+
+	/**
+	 * Gets the IP address where the TaskManager operates.
+	 *
+	 * @return The IP address.
+	 */
+	public String addressString() {
+		return inetAddress.toString();
+	}
+
+	/**
+	 * Returns the fully-qualified domain name the TaskManager. If the name could not be
+	 * determined, the return value will be a textual representation of the TaskManager's IP address.
+	 * 
+	 * @return The fully-qualified domain name of the TaskManager.
+	 */
+	public String getFQDNHostname() {
+		return fqdnHostName;
+	}
+
+	/**
+	 * Gets the hostname of the TaskManager. The hostname derives from the fully qualified
+	 * domain name (FQDN, see {@link #getFQDNHostname()}):
+	 * <ul>
+	 *     <li>If the FQDN is the textual IP address, then the hostname is also the IP address</li>
+	 *     <li>If the FQDN has only one segment (such as "localhost", or "host17"), then this is
+	 *         used as the hostname.</li>
+	 *     <li>If the FQDN has multiple segments (such as "worker3.subgroup.company.net"), then the first
+	 *         segment (here "worker3") will be used as the hostname.</li>
+	 * </ul>
+	 *
+	 * @return The hostname of the TaskManager.
+	 */
+	public String getHostname() {
+		return hostName;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Utilities
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return stringRepresentation;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+		else if (obj != null && obj.getClass() == TaskManagerLocation.class) {
+			TaskManagerLocation that = (TaskManagerLocation) obj;
+			return this.resourceID.equals(that.resourceID) &&
+					this.inetAddress.equals(that.inetAddress) &&
+					this.dataPort == that.dataPort;
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return resourceID.hashCode() + 
+				17 * inetAddress.hashCode() +
+				129 * dataPort;
+	}
+
+	@Override
+	public int compareTo(@Nonnull TaskManagerLocation o) {
+		// decide based on address first
+		int resourceIdCmp = this.resourceID.getResourceIdString().compareTo(o.resourceID.getResourceIdString());
+		if (resourceIdCmp != 0) {
+			return resourceIdCmp;
+		}
+
+		// decide based on ip address next
+		byte[] thisAddress = this.inetAddress.getAddress();
+		byte[] otherAddress = o.inetAddress.getAddress();
+
+		if (thisAddress.length < otherAddress.length) {
+			return -1;
+		} else if (thisAddress.length > otherAddress.length) {
+			return 1;
+		} else {
+			for (int i = 0; i < thisAddress.length; i++) {
+				byte tb = thisAddress[i];
+				byte ob = otherAddress[i];
+				if (tb < ob) {
+					return -1;
+				} else if (tb > ob) {
+					return 1;
+				}
+			}
+		}
+
+		// addresses are identical, decide based on ports.
+		if (this.dataPort < o.dataPort) {
+			return -1;
+		} else if (this.dataPort > o.dataPort) {
+			return 1;
+		} else {
+			return 0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
index d362164..5648bc6 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.messages
 
 import java.util.UUID
 
-import akka.actor.ActorRef
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
+import org.apache.flink.runtime.instance.{HardwareDescription, InstanceID}
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation
 
 import scala.concurrent.duration.{Deadline, FiniteDuration}
 
@@ -63,10 +63,10 @@ object RegistrationMessages {
    * @param numberOfSlots The number of processing slots offered by the TaskManager.
    */
   case class RegisterTaskManager(
-      resourceId: ResourceID,
-      connectionInfo: InstanceConnectionInfo,
-      resources: HardwareDescription,
-      numberOfSlots: Int)
+                                  resourceId: ResourceID,
+                                  connectionInfo: TaskManagerLocation,
+                                  resources: HardwareDescription,
+                                  numberOfSlots: Int)
     extends RegistrationMessage
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 3154826..84750a3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -50,7 +50,7 @@ import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.filecache.FileCache
-import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, InstanceConnectionInfo, InstanceID}
+import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, InstanceID}
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool
@@ -127,7 +127,7 @@ import scala.util.{Failure, Success}
 class TaskManager(
     protected val config: TaskManagerConfiguration,
     protected val resourceID: ResourceID,
-    protected val connectionInfo: InstanceConnectionInfo,
+    protected val location: TaskManagerLocation,
     protected val memoryManager: MemoryManager,
     protected val ioManager: IOManager,
     protected val network: NetworkEnvironment,
@@ -189,7 +189,7 @@ class TaskManager(
   var leaderSessionID: Option[UUID] = None
 
   private val runtimeInfo = new TaskManagerRuntimeInfo(
-       connectionInfo.getHostname(),
+       location.getHostname(),
        new UnmodifiableConfiguration(config.configuration),
        config.tmpDirPaths)
 
@@ -209,7 +209,7 @@ class TaskManager(
    */
   override def preStart(): Unit = {
     log.info(s"Starting TaskManager actor at ${self.path.toSerializationFormat}.")
-    log.info(s"TaskManager data connection information: $connectionInfo")
+    log.info(s"TaskManager data connection information: $location")
     log.info(s"TaskManager has $numberOfSlots task slot(s).")
 
     // log the initial memory utilization
@@ -601,7 +601,7 @@ class TaskManager(
             jobManager ! decorateMessage(
               RegisterTaskManager(
                 resourceID,
-                connectionInfo,
+                location,
                 resources,
                 numberOfSlots)
             )
@@ -1884,7 +1884,8 @@ object TaskManager {
 
     network.start()
 
-    val connectionInfo = new InstanceConnectionInfo(
+    val taskManagerLocation = new TaskManagerLocation(
+      resourceID,
       taskManagerAddress.getAddress(),
       network.getConnectionManager().getDataPort())
 
@@ -1994,7 +1995,7 @@ object TaskManager {
       taskManagerClass,
       taskManagerConfig,
       resourceID,
-      connectionInfo,
+      taskManagerLocation,
       memoryManager,
       ioManager,
       network,

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 2597753..9b5a147 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -19,26 +19,25 @@
 package org.apache.flink.runtime.testingUtils
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration}
+import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, TaskManagerConfiguration}
 
 import scala.language.postfixOps
 
 /** Subclass of the [[TaskManager]] to support testing messages
  */
 class TestingTaskManager(
-    config: TaskManagerConfiguration,
-    resourceID: ResourceID,
-    connectionInfo: InstanceConnectionInfo,
-    memoryManager: MemoryManager,
-    ioManager: IOManager,
-    network: NetworkEnvironment,
-    numberOfSlots: Int,
-    leaderRetrievalService: LeaderRetrievalService)
+                          config: TaskManagerConfiguration,
+                          resourceID: ResourceID,
+                          connectionInfo: TaskManagerLocation,
+                          memoryManager: MemoryManager,
+                          ioManager: IOManager,
+                          network: NetworkEnvironment,
+                          numberOfSlots: Int,
+                          leaderRetrievalService: LeaderRetrievalService)
   extends TaskManager(
     config,
     resourceID,
@@ -51,13 +50,13 @@ class TestingTaskManager(
   with TestingTaskManagerLike {
 
   def this(
-      config: TaskManagerConfiguration,
-      connectionInfo: InstanceConnectionInfo,
-      memoryManager: MemoryManager,
-      ioManager: IOManager,
-      network: NetworkEnvironment,
-      numberOfSlots: Int,
-      leaderRetrievalService: LeaderRetrievalService) {
+            config: TaskManagerConfiguration,
+            connectionInfo: TaskManagerLocation,
+            memoryManager: MemoryManager,
+            ioManager: IOManager,
+            network: NetworkEnvironment,
+            numberOfSlots: Int,
+            leaderRetrievalService: LeaderRetrievalService) {
     this(
       config,
       ResourceID.generate(),

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 2b8b867..cf7cf58 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -110,7 +110,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
 		Instance instance = mock(Instance.class);
 
-		InstanceConnectionInfo instanceConnectionInfo = mock(InstanceConnectionInfo.class);
+		TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
 
 		Slot rootSlot = mock(Slot.class);
 
@@ -123,9 +123,9 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
 		when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot);
 
-		when(instance.getInstanceConnectionInfo()).thenReturn(instanceConnectionInfo);
+		when(instance.getInstanceConnectionInfo()).thenReturn(taskManagerLocation);
 		when(instance.getActorGateway()).thenReturn(actorGateway);
-		when(instanceConnectionInfo.getHostname()).thenReturn("localhost");
+		when(taskManagerLocation.getHostname()).thenReturn("localhost");
 
 		when(rootSlot.getSlotNumber()).thenReturn(0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 903d5f9..cddb6cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.BaseTestingActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -108,11 +108,12 @@ public class ExecutionGraphTestUtils {
 	}
 
 	public static Instance getInstance(final ActorGateway gateway, final int numberOfSlots) throws Exception {
+		ResourceID resourceID = ResourceID.generate();
 		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
 		InetAddress address = InetAddress.getByName("127.0.0.1");
-		InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
+		TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
 
-		return new Instance(gateway, connection, ResourceID.generate(), new InstanceID(), hardwareDescription, numberOfSlots);
+		return new Instance(gateway, connection, resourceID, new InstanceID(), hardwareDescription, numberOfSlots);
 	}
 
 	@SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 2a690d9..a71faf6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -79,12 +79,13 @@ public class TerminalStateDeadlockTest {
 			this.execGraphSchedulerField.setAccessible(true);
 			
 			// the dummy resource
+			ResourceID resourceId = ResourceID.generate();
 			InetAddress address = InetAddress.getByName("127.0.0.1");
-			InstanceConnectionInfo ci = new InstanceConnectionInfo(address, 12345);
+			TaskManagerLocation ci = new TaskManagerLocation(resourceId, address, 12345);
 				
 			HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000);
 			Instance instance = new Instance(DummyActorGateway.INSTANCE, ci,
-				ResourceID.generate(), new InstanceID(), resources, 4);
+				resourceId, new InstanceID(), resources, 4);
 
 			this.resource = instance.allocateSimpleSlot(new JobID());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index 4ee06b36..91472ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -443,10 +443,10 @@ public class VertexLocationConstraintTest {
 	public static Instance getInstance(byte[] ipAddress, int dataPort, String hostname) throws Exception {
 		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
 		
-		InstanceConnectionInfo connection = mock(InstanceConnectionInfo.class);
+		TaskManagerLocation connection = mock(TaskManagerLocation.class);
 		when(connection.address()).thenReturn(InetAddress.getByAddress(ipAddress));
 		when(connection.dataPort()).thenReturn(dataPort);
-		when(connection.getInetAdress()).thenReturn(InetAddress.getByAddress(ipAddress).toString());
+		when(connection.addressString()).thenReturn(InetAddress.getByAddress(ipAddress).toString());
 		when(connection.getHostname()).thenReturn(hostname);
 		when(connection.getFQDNHostname()).thenReturn(hostname);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
deleted file mode 100644
index 3a9488d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.net.InetAddress;
-
-import org.apache.flink.util.InstantiationUtil;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the InstanceConnectionInfo, which identifies the location and connection
- * information of a TaskManager.
- */
-public class InstanceConnectionInfoTest {
-
-	@Test
-	public void testEqualsHashAndCompareTo() {
-		try {
-			// we mock the addresses to save the times of the reverse name lookups
-			InetAddress address1 = mock(InetAddress.class);
-			when(address1.getCanonicalHostName()).thenReturn("localhost");
-			when(address1.getHostName()).thenReturn("localhost");
-			when(address1.getHostAddress()).thenReturn("127.0.0.1");
-			when(address1.getAddress()).thenReturn(new byte[] {127, 0, 0, 1} );
-
-			InetAddress address2 = mock(InetAddress.class);
-			when(address2.getCanonicalHostName()).thenReturn("testhost1");
-			when(address2.getHostName()).thenReturn("testhost1");
-			when(address2.getHostAddress()).thenReturn("0.0.0.0");
-			when(address2.getAddress()).thenReturn(new byte[] {0, 0, 0, 0} );
-
-			InetAddress address3 = mock(InetAddress.class);
-			when(address3.getCanonicalHostName()).thenReturn("testhost2");
-			when(address3.getHostName()).thenReturn("testhost2");
-			when(address3.getHostAddress()).thenReturn("192.168.0.1");
-			when(address3.getAddress()).thenReturn(new byte[] {(byte) 192, (byte) 168, 0, 1} );
-
-			// one == four != two != three
-			InstanceConnectionInfo one = new InstanceConnectionInfo(address1, 19871);
-			InstanceConnectionInfo two = new InstanceConnectionInfo(address2, 19871);
-			InstanceConnectionInfo three = new InstanceConnectionInfo(address3, 10871);
-			InstanceConnectionInfo four = new InstanceConnectionInfo(address1, 19871);
-			
-			assertTrue(one.equals(four));
-			assertTrue(!one.equals(two));
-			assertTrue(!one.equals(three));
-			assertTrue(!two.equals(three));
-			assertTrue(!three.equals(four));
-			
-			assertTrue(one.compareTo(four) == 0);
-			assertTrue(four.compareTo(one) == 0);
-			assertTrue(one.compareTo(two) != 0);
-			assertTrue(one.compareTo(three) != 0);
-			assertTrue(two.compareTo(three) != 0);
-			assertTrue(three.compareTo(four) != 0);
-			
-			{
-				int val = one.compareTo(two);
-				assertTrue(two.compareTo(one) == -val);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSerialization() {
-		try {
-			// without resolved hostname
-			{
-				InstanceConnectionInfo original = new InstanceConnectionInfo(InetAddress.getByName("1.2.3.4"), 8888);
-				
-				InstanceConnectionInfo copy = InstantiationUtil.createCopyWritable(original);
-				assertEquals(original, copy);
-				
-				InstanceConnectionInfo serCopy = InstantiationUtil.clone(original);
-				assertEquals(original, serCopy);
-			}
-						
-			// with resolved hostname
-			{
-				InstanceConnectionInfo original = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871);
-				original.getFQDNHostname();
-				
-				InstanceConnectionInfo copy = InstantiationUtil.createCopyWritable(original);
-				assertEquals(original, copy);
-				
-				InstanceConnectionInfo serCopy = InstantiationUtil.clone(original);
-				assertEquals(original, serCopy);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testGetFQDNHostname() {
-		try {
-			InstanceConnectionInfo info1 = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871);
-			assertNotNull(info1.getFQDNHostname());
-			
-			InstanceConnectionInfo info2 = new InstanceConnectionInfo(InetAddress.getByName("1.2.3.4"), 8888);
-			assertNotNull(info2.getFQDNHostname());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testGetHostname0() {
-		try {
-			InetAddress address = mock(InetAddress.class);
-			when(address.getCanonicalHostName()).thenReturn("worker2.cluster.mycompany.com");
-			when(address.getHostName()).thenReturn("worker2.cluster.mycompany.com");
-			when(address.getHostAddress()).thenReturn("127.0.0.1");
-
-			final InstanceConnectionInfo info = new InstanceConnectionInfo(address, 19871);
-			Assert.assertEquals("worker2", info.getHostname());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testGetHostname1() {
-		try {
-			InetAddress address = mock(InetAddress.class);
-			when(address.getCanonicalHostName()).thenReturn("worker10");
-			when(address.getHostName()).thenReturn("worker10");
-			when(address.getHostAddress()).thenReturn("127.0.0.1");
-
-			InstanceConnectionInfo info = new InstanceConnectionInfo(address, 19871);
-			Assert.assertEquals("worker10", info.getHostname());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testGetHostname2() {
-		try {
-			final String addressString = "192.168.254.254";
-
-			// we mock the addresses to save the times of the reverse name lookups
-			InetAddress address = mock(InetAddress.class);
-			when(address.getCanonicalHostName()).thenReturn("192.168.254.254");
-			when(address.getHostName()).thenReturn("192.168.254.254");
-			when(address.getHostAddress()).thenReturn("192.168.254.254");
-			when(address.getAddress()).thenReturn(new byte[] {(byte) 192, (byte) 168, (byte) 254, (byte) 254} );
-
-			InstanceConnectionInfo info = new InstanceConnectionInfo(address, 54152);
-
-			assertNotNull(info.getFQDNHostname());
-			assertTrue(info.getFQDNHostname().equals(addressString));
-
-			assertNotNull(info.getHostname());
-			assertTrue(info.getHostname().equals(addressString));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index ff5e2ab..f1ed960 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -18,24 +18,11 @@
 
 package org.apache.flink.runtime.instance;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-
-import java.net.InetAddress;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.UUID;
-
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
+
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 
@@ -44,6 +31,18 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * Tests for {@link org.apache.flink.runtime.instance.InstanceManager}.
  */
@@ -76,13 +75,13 @@ public class InstanceManagerTest{
 			InetAddress address = InetAddress.getByName("127.0.0.1");
 
 			// register three instances
-			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, dataPort);
-			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, dataPort + 15);
-			InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, dataPort + 30);
-
 			ResourceID resID1 = ResourceID.generate();
 			ResourceID resID2 = ResourceID.generate();
 			ResourceID resID3 = ResourceID.generate();
+			
+			TaskManagerLocation ici1 = new TaskManagerLocation(resID1, address, dataPort);
+			TaskManagerLocation ici2 = new TaskManagerLocation(resID2, address, dataPort + 15);
+			TaskManagerLocation ici3 = new TaskManagerLocation(resID3, address, dataPort + 30);
 
 			final JavaTestKit probe1 = new JavaTestKit(system);
 			final JavaTestKit probe2 = new JavaTestKit(system);
@@ -99,16 +98,16 @@ public class InstanceManagerTest{
 			assertEquals(8, cm.getTotalNumberOfSlots());
 
 			Collection<Instance> instances = cm.getAllRegisteredInstances();
-			Set<InstanceConnectionInfo> instanceConnectionInfos = new
-					HashSet<InstanceConnectionInfo>();
+			Set<TaskManagerLocation> taskManagerLocations = new
+					HashSet<TaskManagerLocation>();
 
 			for(Instance instance: instances){
-				instanceConnectionInfos.add(instance.getInstanceConnectionInfo());
+				taskManagerLocations.add(instance.getInstanceConnectionInfo());
 			}
 
-			assertTrue(instanceConnectionInfos.contains(ici1));
-			assertTrue(instanceConnectionInfos.contains(ici2));
-			assertTrue(instanceConnectionInfos.contains(ici3));
+			assertTrue(taskManagerLocations.contains(ici1));
+			assertTrue(taskManagerLocations.contains(ici2));
+			assertTrue(taskManagerLocations.contains(ici3));
 
 			cm.shutdown();
 		}
@@ -131,7 +130,7 @@ public class InstanceManagerTest{
 
 			HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
 			InetAddress address = InetAddress.getByName("127.0.0.1");
-			InstanceConnectionInfo ici = new InstanceConnectionInfo(address, dataPort);
+			TaskManagerLocation ici = new TaskManagerLocation(resID1, address, dataPort);
 
 			JavaTestKit probe = new JavaTestKit(system);
 			cm.registerTaskManager(probe.getRef(), resID1,
@@ -141,13 +140,12 @@ public class InstanceManagerTest{
 			assertEquals(1, cm.getTotalNumberOfSlots());
 
 			try {
-				cm.registerTaskManager(probe.getRef(), resID2,
-					ici, resources, 1, leaderSessionID);
+				cm.registerTaskManager(probe.getRef(), resID2, ici, resources, 1, leaderSessionID);
 			} catch (Exception e) {
 				// good
 			}
 
-			// check for correct number of registerede instances
+			// check for correct number of registered instances
 			assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
 			assertEquals(1, cm.getTotalNumberOfSlots());
 
@@ -176,9 +174,9 @@ public class InstanceManagerTest{
 			InetAddress address = InetAddress.getByName("127.0.0.1");
 
 			// register three instances
-			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, dataPort);
-			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, dataPort + 1);
-			InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, dataPort + 2);
+			TaskManagerLocation ici1 = new TaskManagerLocation(resID1, address, dataPort);
+			TaskManagerLocation ici2 = new TaskManagerLocation(resID2, address, dataPort + 1);
+			TaskManagerLocation ici3 = new TaskManagerLocation(resID3, address, dataPort + 2);
 
 			JavaTestKit probe1 = new JavaTestKit(system);
 			JavaTestKit probe2 = new JavaTestKit(system);
@@ -240,7 +238,7 @@ public class InstanceManagerTest{
 				ResourceID resID = ResourceID.generate();
 				HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
 				InetAddress address = InetAddress.getByName("127.0.0.1");
-				InstanceConnectionInfo ici = new InstanceConnectionInfo(address, 20000);
+				TaskManagerLocation ici = new TaskManagerLocation(resID, address, 20000);
 
 				JavaTestKit probe = new JavaTestKit(system);
 				cm.registerTaskManager(probe.getRef(), resID,

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index faa679b..82d3723 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -25,6 +25,7 @@ import java.net.InetAddress;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.junit.Test;
 
 /**
@@ -35,12 +36,13 @@ public class InstanceTest {
 	@Test
 	public void testAllocatingAndCancellingSlots() {
 		try {
+			ResourceID resourceID = ResourceID.generate();
 			HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
 			InetAddress address = InetAddress.getByName("127.0.0.1");
-			InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
+			TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
 
 			Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
-				ResourceID.generate(), new InstanceID(), hardwareDescription, 4);
+					resourceID, new InstanceID(), hardwareDescription, 4);
 
 			assertEquals(4, instance.getTotalNumberOfSlots());
 			assertEquals(4, instance.getNumberOfAvailableSlots());
@@ -97,12 +99,13 @@ public class InstanceTest {
 	@Test
 	public void testInstanceDies() {
 		try {
+			ResourceID resourceID = ResourceID.generate();
 			HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
 			InetAddress address = InetAddress.getByName("127.0.0.1");
-			InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
+			TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
 
 			Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
-				ResourceID.generate(), new InstanceID(), hardwareDescription, 3);
+					resourceID, new InstanceID(), hardwareDescription, 3);
 
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 
@@ -128,12 +131,13 @@ public class InstanceTest {
 	@Test
 	public void testCancelAllSlots() {
 		try {
+			ResourceID resourceID = ResourceID.generate();
 			HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
 			InetAddress address = InetAddress.getByName("127.0.0.1");
-			InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
+			TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
 
 			Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
-				ResourceID.generate(), new InstanceID(), hardwareDescription, 3);
+					resourceID, new InstanceID(), hardwareDescription, 3);
 
 			assertEquals(3, instance.getNumberOfAvailableSlots());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index 459a3ed..82c2a74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.api.common.JobID;
 
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.junit.Test;
 
 import org.mockito.Matchers;
@@ -143,12 +144,13 @@ public class SimpleSlotTest {
 	}
 
 	public static SimpleSlot getSlot() throws Exception {
+		ResourceID resourceID = ResourceID.generate();
 		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
 		InetAddress address = InetAddress.getByName("127.0.0.1");
-		InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
+		TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
 
 		Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
-			ResourceID.generate(), new InstanceID(), hardwareDescription, 1);
+				resourceID, new InstanceID(), hardwareDescription, 1);
 		return instance.allocateSimpleSlot(new JobID());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index 983d6e6..99360e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.when;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -35,7 +34,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -52,7 +51,8 @@ public class SchedulerTestUtils {
 			throw new IllegalArgumentException();
 		}
 		
-		InetAddress address;
+		final ResourceID resourceID = ResourceID.generate();
+		final InetAddress address;
 		try {
 			address = InetAddress.getByName("127.0.0.1");
 		}
@@ -62,12 +62,12 @@ public class SchedulerTestUtils {
 		
 		int dataPort = port.getAndIncrement();
 		
-		InstanceConnectionInfo ci = new InstanceConnectionInfo(address, dataPort);
+		TaskManagerLocation ci = new TaskManagerLocation(resourceID, address, dataPort);
 		
 		final long GB = 1024L*1024*1024;
 		HardwareDescription resources = new HardwareDescription(4, 4*GB, 3*GB, 2*GB);
 		
-		return new Instance(DummyActorGateway.INSTANCE, ci, ResourceID.generate(),
+		return new Instance(DummyActorGateway.INSTANCE, ci, resourceID,
 			new InstanceID(), resources, numSlots);
 	}
 	
@@ -143,19 +143,4 @@ public class SchedulerTestUtils {
 		
 		return set.size() == obj.length;
 	}
-	
-	public static boolean areSameSets(Collection<Object> set1, Collection<Object> set2) {
-		if (set1 == null || set2 == null) {
-			throw new IllegalArgumentException();
-		}
-		
-		HashSet<Object> set = new HashSet<Object>(set1);
-		for (Object o : set2) {
-			if (!set.remove(o)) {
-				return false;
-			}
-		}
-		
-		return set.isEmpty();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
index ca09634..3307568 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
@@ -24,14 +24,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.RegistrationMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -83,7 +82,7 @@ public class ResourceManagerITCase extends TestLogger {
 			jobManager.tell(
 				new RegistrationMessages.RegisterTaskManager(
 					resourceID,
-					Mockito.mock(InstanceConnectionInfo.class),
+					Mockito.mock(TaskManagerLocation.class),
 					null,
 					1),
 				me);

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 3371c49..bda4174 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -25,6 +25,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Kill;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
@@ -32,7 +33,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
@@ -46,12 +46,12 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
-
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+
 import org.junit.Test;
+
 import scala.Option;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -105,7 +105,9 @@ public class TaskManagerComponentsStartupShutdownTest {
 					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, 0,
 					Option.<NettyConfig>empty(), 0, 0);
 
-			final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(), 10000);
+			ResourceID taskManagerId = ResourceID.generate();
+			
+			final TaskManagerLocation connectionInfo = new TaskManagerLocation(taskManagerId, InetAddress.getLocalHost(), 10000);
 
 			final MemoryManager memManager = new MemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, false);
 			final IOManager ioManager = new IOManagerAsync(TMP_DIR);
@@ -130,7 +132,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 			final Props tmProps = Props.create(
 				TaskManager.class,
 				tmConfig,
-				ResourceID.generate(),
+				taskManagerId,
 				connectionInfo,
 				memManager,
 				ioManager,