You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/22 23:47:30 UTC

[09/22] Rework the Taskmanager to a slot based model and remove legacy cloud code

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
deleted file mode 100644
index 56b4eae..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Set;
-
-import eu.stratosphere.nephele.deployment.TaskDeploymentDescriptor;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileRequest;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.taskmanager.TaskKillResult;
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.nephele.ipc.RPC;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.net.NetUtils;
-import eu.stratosphere.nephele.protocols.TaskOperationProtocol;
-import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
-import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
-import eu.stratosphere.nephele.topology.NetworkNode;
-import eu.stratosphere.nephele.topology.NetworkTopology;
-
-/**
- * An abstract instance represents a resource a {@link eu.stratosphere.nephele.taskmanager.TaskManager} runs on.
- * 
- */
-public abstract class AbstractInstance extends NetworkNode {
-
-	/**
-	 * The type of the instance.
-	 */
-	private final InstanceType instanceType;
-
-	/**
-	 * The connection info identifying the instance.
-	 */
-	private final InstanceConnectionInfo instanceConnectionInfo;
-
-	/**
-	 * The hardware description as reported by the instance itself.
-	 */
-	private final HardwareDescription hardwareDescription;
-
-	/**
-	 * Stores the RPC stub object for the instance's task manager.
-	 */
-	private TaskOperationProtocol taskManager = null;
-
-	/**
-	 * Constructs an abstract instance object.
-	 * 
-	 * @param instanceType
-	 *        the type of the instance
-	 * @param instanceConnectionInfo
-	 *        the connection info identifying the instance
-	 * @param parentNode
-	 *        the parent node in the network topology
-	 * @param networkTopology
-	 *        the network topology this node is a part of
-	 * @param hardwareDescription
-	 *        the hardware description provided by the instance itself
-	 */
-	public AbstractInstance(final InstanceType instanceType, final InstanceConnectionInfo instanceConnectionInfo,
-			final NetworkNode parentNode, final NetworkTopology networkTopology,
-			final HardwareDescription hardwareDescription) {
-		super((instanceConnectionInfo == null) ? null : instanceConnectionInfo.toString(), parentNode, networkTopology);
-		this.instanceType = instanceType;
-		this.instanceConnectionInfo = instanceConnectionInfo;
-		this.hardwareDescription = hardwareDescription;
-	}
-
-	/**
-	 * Creates or returns the RPC stub object for the instance's task manager.
-	 * 
-	 * @return the RPC stub object for the instance's task manager
-	 * @throws IOException
-	 *         thrown if the RPC stub object for the task manager cannot be created
-	 */
-	private TaskOperationProtocol getTaskManagerProxy() throws IOException {
-
-		if (this.taskManager == null) {
-
-			this.taskManager = RPC.getProxy(TaskOperationProtocol.class,
-				new InetSocketAddress(getInstanceConnectionInfo().address(),
-					getInstanceConnectionInfo().ipcPort()), NetUtils.getSocketFactory());
-		}
-
-		return this.taskManager;
-	}
-
-	/**
-	 * Destroys and removes the RPC stub object for this instance's task manager.
-	 */
-	private void destroyTaskManagerProxy() {
-
-		if (this.taskManager != null) {
-			RPC.stopProxy(this.taskManager);
-			this.taskManager = null;
-		}
-	}
-
-	/**
-	 * Returns the type of the instance.
-	 * 
-	 * @return the type of the instance
-	 */
-	public final InstanceType getType() {
-		return this.instanceType;
-	}
-
-	/**
-	 * Returns the instance's connection information object.
-	 * 
-	 * @return the instance's connection information object
-	 */
-	public final InstanceConnectionInfo getInstanceConnectionInfo() {
-		return this.instanceConnectionInfo;
-	}
-
-	/**
-	 * Returns the instance's hardware description as reported by the instance itself.
-	 * 
-	 * @return the instance's hardware description
-	 */
-	public HardwareDescription getHardwareDescription() {
-		return this.hardwareDescription;
-	}
-
-	/**
-	 * Checks if all the libraries required to run the job with the given
-	 * job ID are available on this instance. Any libary that is missing
-	 * is transferred to the instance as a result of this call.
-	 * 
-	 * @param jobID
-	 *        the ID of the job whose libraries are to be checked for
-	 * @throws IOException
-	 *         thrown if an error occurs while checking for the libraries
-	 */
-	public synchronized void checkLibraryAvailability(final JobID jobID) throws IOException {
-
-		// Now distribute the required libraries for the job
-		String[] requiredLibraries = LibraryCacheManager.getRequiredJarFiles(jobID);
-
-		if (requiredLibraries == null) {
-			throw new IOException("No entry of required libraries for job " + jobID);
-		}
-
-		LibraryCacheProfileRequest request = new LibraryCacheProfileRequest();
-		request.setRequiredLibraries(requiredLibraries);
-
-		// Send the request
-		LibraryCacheProfileResponse response = null;
-		response = getTaskManagerProxy().getLibraryCacheProfile(request);
-
-		// Check response and transfer libraries if necessary
-		for (int k = 0; k < requiredLibraries.length; k++) {
-			if (!response.isCached(k)) {
-				LibraryCacheUpdate update = new LibraryCacheUpdate(requiredLibraries[k]);
-				getTaskManagerProxy().updateLibraryCache(update);
-			}
-		}
-	}
-
-	/**
-	 * Submits a list of tasks to the instance's {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
-	 * 
-	 * @param tasks
-	 *        the list of tasks to be submitted
-	 * @return the result of the submission attempt
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the task
-	 */
-	public synchronized List<TaskSubmissionResult> submitTasks(final List<TaskDeploymentDescriptor> tasks)
-			throws IOException {
-
-		return getTaskManagerProxy().submitTasks(tasks);
-	}
-
-	/**
-	 * Cancels the task identified by the given ID at the instance's
-	 * {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
-	 * 
-	 * @param id
-	 *        the ID identifying the task to be canceled
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the request or receiving the response
-	 * @return the result of the cancel attempt
-	 */
-	public synchronized TaskCancelResult cancelTask(final ExecutionVertexID id) throws IOException {
-
-		return getTaskManagerProxy().cancelTask(id);
-	}
-
-	/**
-	 * Kills the task identified by the given ID at the instance's
-	 * {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
-	 *
-	 * @param id
-	 *        the ID identifying the task to be killed
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the request or receiving the response
-	 * @return the result of the kill attempt
-	 */
-	public synchronized TaskKillResult killTask(final ExecutionVertexID id) throws IOException {
-
-		return getTaskManagerProxy().killTask(id);
-	}
-
-	@Override
-	public boolean equals(final Object obj) {
-
-		// Fall back since dummy instances do not have a instanceConnectionInfo
-		if (this.instanceConnectionInfo == null) {
-			return super.equals(obj);
-		}
-
-		if (!(obj instanceof AbstractInstance)) {
-			return false;
-		}
-
-		final AbstractInstance abstractInstance = (AbstractInstance) obj;
-
-		return this.instanceConnectionInfo.equals(abstractInstance.getInstanceConnectionInfo());
-	}
-
-
-	@Override
-	public int hashCode() {
-
-		// Fall back since dummy instances do not have a instanceConnectionInfo
-		if (this.instanceConnectionInfo == null) {
-			return super.hashCode();
-		}
-
-		return this.instanceConnectionInfo.hashCode();
-	}
-
-	/**
-	 * Triggers the remote task manager to print out the current utilization of its read and write buffers to its logs.
-	 * 
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the request
-	 */
-	public synchronized void logBufferUtilization() throws IOException {
-
-		getTaskManagerProxy().logBufferUtilization();
-	}
-
-	/**
-	 * Kills the task manager running on this instance. This method is mainly intended to test and debug Nephele's fault
-	 * tolerance mechanisms.
-	 * 
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the request
-	 */
-	public synchronized void killTaskManager() throws IOException {
-
-		getTaskManagerProxy().killTaskManager();
-	}
-
-	/**
-	 * Invalidates the entries identified by the given channel IDs from the remote task manager's receiver lookup cache.
-	 * 
-	 * @param channelIDs
-	 *        the channel IDs identifying the cache entries to invalidate
-	 * @throws IOException
-	 *         thrown if an error occurs during this remote procedure call
-	 */
-	public synchronized void invalidateLookupCacheEntries(final Set<ChannelID> channelIDs) throws IOException {
-
-		getTaskManagerProxy().invalidateLookupCacheEntries(channelIDs);
-	}
-
-	/**
-	 * Destroys all RPC stub objects attached to this instance.
-	 */
-	public synchronized void destroyProxies() {
-
-		destroyTaskManagerProxy();
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedResource.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedResource.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedResource.java
index eb0a835..7f2ad04 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedResource.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedResource.java
@@ -23,7 +23,7 @@ import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
 /**
  * An allocated resource object unambiguously defines the
  * hardware resources which have been assigned to an {@link eu.stratosphere.nephele.executiongraph.ExecutionVertex} for
- * executing a task. The allocated resource is comprised of an {@link eu.stratosphere.nephele.instance.AbstractInstance}
+ * executing a task. The allocated resource is comprised of an {@link Instance}
  * which identifies the node the task is scheduled to run on as well as an
  * {@link eu.stratosphere.nephele.instance.AllocationID} which determines the resources the task is scheduled to
  * allocate within the node.
@@ -36,12 +36,7 @@ public final class AllocatedResource {
 	/**
 	 * The instance a task is scheduled to run on.
 	 */
-	private final AbstractInstance instance;
-
-	/**
-	 * The instance type this allocated resource represents.
-	 */
-	private final InstanceType instanceType;
+	private final Instance instance;
 
 	/**
 	 * The allocation ID identifying the resources within the instance
@@ -60,24 +55,20 @@ public final class AllocatedResource {
 	 * 
 	 * @param instance
 	 *        the instance a task is scheduled to run on.
-	 * @param instanceType
-	 *        the instance type this allocated resource represents
 	 * @param allocationID
 	 *        the allocation ID identifying the allocated resources within the instance
 	 */
-	public AllocatedResource(final AbstractInstance instance, final InstanceType instanceType,
-			final AllocationID allocationID) {
+	public AllocatedResource(final Instance instance, final AllocationID allocationID) {
 		this.instance = instance;
-		this.instanceType = instanceType;
 		this.allocationID = allocationID;
 	}
 
 	/**
 	 * Returns the instance a task is scheduled to run on.
-	 * 
+	 *
 	 * @return the instance a task is scheduled to run on
 	 */
-	public AbstractInstance getInstance() {
+	public Instance getInstance() {
 		return this.instance;
 	}
 
@@ -90,15 +81,6 @@ public final class AllocatedResource {
 		return this.allocationID;
 	}
 
-	/**
-	 * Returns the instance type this allocated resource represents.
-	 * 
-	 * @return the instance type this allocated resource represents
-	 */
-	public InstanceType getInstanceType() {
-		return this.instanceType;
-	}
-
 
 	@Override
 	public boolean equals(final Object obj) {
@@ -120,16 +102,6 @@ public final class AllocatedResource {
 				}
 			}
 
-			if (this.instanceType == null) {
-				if (allocatedResource.instance != null) {
-					return false;
-				}
-			} else {
-				if (!this.instanceType.equals(allocatedResource.getInstanceType())) {
-					return false;
-				}
-			}
-
 			return true;
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedSlot.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedSlot.java
new file mode 100644
index 0000000..0641944
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocatedSlot.java
@@ -0,0 +1,65 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.instance;
+
+import eu.stratosphere.nephele.jobgraph.JobID;
+
+/**
+ * An allocated slot is a part of an instance which is assigned to a job.
+ * <p>
+ * This class is thread-safe.
+ * 
+ */
+public class AllocatedSlot {
+
+	/**
+	 * The allocation ID which identifies the resources occupied by this slot.
+	 */
+	private final AllocationID allocationID;
+
+	/**
+	 * The ID of the job this slice belongs to.
+	 */
+	private final JobID jobID;
+
+	/**
+	 * Creates a new allocated slice on the given hosting instance.
+	 * 
+	 * @param jobID
+	 *        the ID of the job this slice belongs to
+	 */
+	public AllocatedSlot(final JobID jobID) {
+
+		this.allocationID = new AllocationID();
+		this.jobID = jobID;
+	}
+
+	/**
+	 * Returns the allocation ID of this slice.
+	 * 
+	 * @return the allocation ID of this slice
+	 */
+	public AllocationID getAllocationID() {
+		return this.allocationID;
+	}
+
+	/**
+	 * Returns the ID of the job this allocated slice belongs to.
+	 * 
+	 * @return the ID of the job this allocated slice belongs to
+	 */
+	public JobID getJobID() {
+		return this.jobID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
index 3c83b80..3ed5013 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
@@ -17,8 +17,8 @@ import eu.stratosphere.nephele.AbstractID;
 
 /**
  * An allocation ID unambiguously identifies the allocated resources
- * within an {@link AbstractInstance}. The ID is necessary if an {@link InstanceManager} decides to partition
- * {@link AbstractInstance}s
+ * within an {@link Instance}. The ID is necessary if an {@link InstanceManager} decides to partition
+ * {@link Instance}s
  * without the knowledge of Nephele's scheduler.
  * 
  */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DefaultInstanceManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DefaultInstanceManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DefaultInstanceManager.java
new file mode 100644
index 0000000..7d5f31b
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DefaultInstanceManager.java
@@ -0,0 +1,393 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.instance;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.configuration.GlobalConfiguration;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.nephele.topology.NetworkNode;
+import eu.stratosphere.nephele.topology.NetworkTopology;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.TimerTask;
+import java.util.Timer;
+
+/**
+ * In Nephele an instance manager maintains the set of available compute resources. It is responsible for allocating new
+ * compute resources,
+ * provisioning available compute resources to the JobManager and keeping track of the availability of the utilized
+ * compute resources in order
+ * to report unexpected resource outages.
+ * 
+ */
+public class DefaultInstanceManager implements InstanceManager {
+
+	// ------------------------------------------------------------------------
+	// Internal Constants
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The log object used to report debugging and error information.
+	 */
+	private static final Log LOG = LogFactory.getLog(DefaultInstanceManager.class);
+
+	/**
+	 * Default duration after which a host is purged in case it did not send
+	 * a heart-beat message.
+	 */
+	private static final int DEFAULT_CLEANUP_INTERVAL = 2 * 60; // 2 min.
+
+	/**
+	 * The key to retrieve the clean up interval from the configuration.
+	 */
+	private static final String CLEANUP_INTERVAL_KEY = "instancemanager.cluster.cleanupinterval";
+
+	// ------------------------------------------------------------------------
+	// Fields
+	// ------------------------------------------------------------------------
+
+	private final Object lock = new Object();
+
+	/**
+	 * Duration after which a host is purged in case it did not send a
+	 * heart-beat message.
+	 */
+	private final long cleanUpInterval;
+
+	/**
+	 * Set of hosts known to run a task manager that are thus able to execute
+	 * tasks.
+	 */
+	private final Map<InstanceConnectionInfo, Instance> registeredHosts;
+
+	/**
+	 * The network topology of the cluster.
+	 */
+	private final NetworkTopology networkTopology;
+
+	/**
+	 * Object that is notified if instances become available or vanish.
+	 */
+	private InstanceListener instanceListener;
+
+
+	private boolean shutdown;
+
+	/**
+	 * Periodic task that checks whether hosts have not sent their heart-beat
+	 * messages and purges the hosts in this case.
+	 */
+	private final TimerTask cleanupStaleMachines = new TimerTask() {
+
+		@Override
+		public void run() {
+
+			synchronized (DefaultInstanceManager.this.lock) {
+
+				final List<Map.Entry<InstanceConnectionInfo, Instance>> hostsToRemove =
+						new ArrayList<Map.Entry<InstanceConnectionInfo, Instance>>();
+
+				final Map<JobID, List<AllocatedResource>> staleResources = new HashMap<JobID, List<AllocatedResource>>();
+
+				// check all hosts whether they did not send heart-beat messages.
+				for (Map.Entry<InstanceConnectionInfo, Instance> entry : registeredHosts.entrySet()) {
+
+					final Instance host = entry.getValue();
+					if (!host.isStillAlive(cleanUpInterval)) {
+
+						// this host has not sent the heart-beat messages
+						// -> we terminate all instances running on this host and notify the jobs
+						final Collection<AllocatedSlot> slots = host.removeAllocatedSlots();
+						for (AllocatedSlot slot : slots) {
+
+							final JobID jobID = slot.getJobID();
+
+							List<AllocatedResource> staleResourcesOfJob = staleResources.get(jobID);
+							if (staleResourcesOfJob == null) {
+								staleResourcesOfJob = new ArrayList<AllocatedResource>();
+								staleResources.put(jobID, staleResourcesOfJob);
+							}
+
+							staleResourcesOfJob.add(new AllocatedResource(host,	slot.getAllocationID()));
+						}
+
+						hostsToRemove.add(entry);
+					}
+				}
+
+				registeredHosts.entrySet().removeAll(hostsToRemove);
+
+				final Iterator<Map.Entry<JobID, List<AllocatedResource>>> it = staleResources.entrySet().iterator();
+				while (it.hasNext()) {
+					final Map.Entry<JobID, List<AllocatedResource>> entry = it.next();
+					if (instanceListener != null) {
+						instanceListener.allocatedResourcesDied(entry.getKey(), entry.getValue());
+					}
+				}
+			}
+		}
+	};
+
+	// ------------------------------------------------------------------------
+	// Constructor and set-up
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Constructor.
+	 */
+	public DefaultInstanceManager() {
+
+		this.registeredHosts = new HashMap<InstanceConnectionInfo, Instance>();
+
+		long tmpCleanUpInterval = (long) GlobalConfiguration.getInteger(CLEANUP_INTERVAL_KEY, DEFAULT_CLEANUP_INTERVAL) * 1000;
+
+		if (tmpCleanUpInterval < 10) { // Clean up interval must be at least ten seconds
+			LOG.warn("Invalid clean up interval. Reverting to default cleanup interval of " + DEFAULT_CLEANUP_INTERVAL
+					+ " secs.");
+			tmpCleanUpInterval = DEFAULT_CLEANUP_INTERVAL;
+		}
+
+		this.cleanUpInterval = tmpCleanUpInterval;
+
+		this.networkTopology = NetworkTopology.createEmptyTopology();
+
+		// look every BASEINTERVAL milliseconds for crashed hosts
+		final boolean runTimerAsDaemon = true;
+		new Timer(runTimerAsDaemon).schedule(cleanupStaleMachines, 1000, 1000);
+	}
+
+	@Override
+	public void shutdown() {
+		synchronized (this.lock) {
+			if (this.shutdown) {
+				return;
+			}
+
+			this.cleanupStaleMachines.cancel();
+
+			Iterator<Instance> it = this.registeredHosts.values().iterator();
+			while (it.hasNext()) {
+				it.next().destroyProxies();
+			}
+			this.registeredHosts.clear();
+
+			this.shutdown = true;
+		}
+	}
+
+	@Override
+	public void releaseAllocatedResource(AllocatedResource allocatedResource) throws InstanceException
+	{
+		synchronized (this.lock) {
+			// release the instance from the host
+			final Instance clusterInstance = allocatedResource.getInstance();
+			clusterInstance.releaseSlot(allocatedResource.getAllocationID());
+		}
+	}
+
+	/**
+	 * Creates a new {@link Instance} object to manage instances that can
+	 * be executed on that host.
+	 *
+	 * @param instanceConnectionInfo
+	 *        the connection information for the instance
+	 * @param hardwareDescription
+	 *        the hardware description provided by the new instance
+	 * @param numberOfSlots
+	 * 		  number of slots available on the instance
+	 * @return a new {@link Instance} object or <code>null</code> if the cluster instance could not be created
+	 */
+	private Instance createNewHost(final InstanceConnectionInfo instanceConnectionInfo,
+							final HardwareDescription hardwareDescription, int numberOfSlots) {
+
+		// Try to match new host with a stub host from the existing topology
+		String instanceName = instanceConnectionInfo.hostname();
+		NetworkNode parentNode = this.networkTopology.getRootNode();
+		NetworkNode currentStubNode = null;
+
+		// Try to match new host using the host name
+		while (true) {
+
+			currentStubNode = this.networkTopology.getNodeByName(instanceName);
+			if (currentStubNode != null) {
+				break;
+			}
+
+			final int pos = instanceName.lastIndexOf('.');
+			if (pos == -1) {
+				break;
+			}
+
+			/*
+			 * If host name is reported as FQDN, iterative remove parts
+			 * of the domain name until a match occurs or no more dots
+			 * can be found in the host name.
+			 */
+			instanceName = instanceName.substring(0, pos);
+		}
+
+		// Try to match the new host using the IP address
+		if (currentStubNode == null) {
+			instanceName = instanceConnectionInfo.address().toString();
+			instanceName = instanceName.replaceAll("/", ""); // Remove any / characters
+			currentStubNode = this.networkTopology.getNodeByName(instanceName);
+		}
+
+		if (currentStubNode != null) {
+			/*
+			 * The instance name will be the same as the one of the stub node. That way
+			 * the stub now will be removed from the network topology and replaced be
+			 * the new node.
+			 */
+			if (currentStubNode.getParentNode() != null) {
+				parentNode = currentStubNode.getParentNode();
+			}
+			// Remove the stub node from the tree
+			currentStubNode.remove();
+		}
+
+		LOG.info("Creating instance for " + instanceConnectionInfo + ", parent is "
+				+ parentNode.getName());
+		final Instance host = new Instance(instanceConnectionInfo, parentNode,
+				this.networkTopology, hardwareDescription, numberOfSlots);
+
+		return host;
+	}
+
+	@Override
+	public void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo) {
+
+		synchronized (this.lock) {
+			Instance host = registeredHosts.get(instanceConnectionInfo);
+
+			if(host == null){
+				LOG.error("Task manager with connection info " + instanceConnectionInfo + " has not been registered.");
+				return;
+			}
+
+			host.reportHeartBeat();
+		}
+	}
+
+	@Override
+	public void registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
+									HardwareDescription hardwareDescription, int numberOfSlots){
+		synchronized(this.lock){
+			if(registeredHosts.containsKey(instanceConnectionInfo)){
+				LOG.error("Task manager with connection info " + instanceConnectionInfo + " has already been " +
+						"registered.");
+				return;
+			}
+
+			Instance host = createNewHost(instanceConnectionInfo, hardwareDescription, numberOfSlots);
+
+			if(host == null){
+				LOG.error("Could not create a new host object for register task manager for connection info " +
+						instanceConnectionInfo);
+				return;
+			}
+
+			this.registeredHosts.put(instanceConnectionInfo, host);
+			LOG.info("New number of registered hosts is " + this.registeredHosts.size());
+
+			host.reportHeartBeat();
+		}
+	}
+
+	@Override
+	public void requestInstance(JobID jobID, Configuration conf,  int requiredSlots)
+			throws InstanceException
+	{
+
+		synchronized(this.lock) {
+			Iterator<Instance> clusterIterator = this.registeredHosts.values().iterator();
+			Instance instance = null;
+			List<AllocatedResource> allocatedResources = new ArrayList<AllocatedResource>();
+			int allocatedSlots = 0;
+
+			while(clusterIterator.hasNext()) {
+				instance = clusterIterator.next();
+				while(instance.getNumberOfAvailableSlots() >0  && allocatedSlots < requiredSlots){
+					AllocatedResource resource = instance.allocateSlot(jobID);
+					allocatedResources.add(resource);
+					allocatedSlots++;
+				}
+			}
+
+			if(allocatedSlots < requiredSlots){
+				throw new InstanceException("Cannot allocate the required number of slots: " + requiredSlots + ".");
+			}
+
+			if (this.instanceListener != null) {
+				final InstanceNotifier instanceNotifier = new InstanceNotifier(
+						this.instanceListener, jobID, allocatedResources);
+				instanceNotifier.start();
+			}
+		}
+	}
+
+	@Override
+	public NetworkTopology getNetworkTopology(JobID jobID) {
+		return this.networkTopology;
+	}
+
+	@Override
+	public void setInstanceListener(InstanceListener instanceListener) {
+		synchronized (this.lock) {
+			this.instanceListener = instanceListener;
+		}
+	}
+
+	@Override
+	public Instance getInstanceByName(String name) {
+		if (name == null) {
+			throw new IllegalArgumentException("Argument name must not be null");
+		}
+
+		synchronized (this.lock) {
+			final Iterator<Instance> it = this.registeredHosts.values().iterator();
+			while (it.hasNext()) {
+				final Instance instance = it.next();
+				if (name.equals(instance.getName())) {
+					return instance;
+				}
+			}
+		}
+
+		return null;
+	}
+
+	@Override
+	public int getNumberOfTaskTrackers() {
+		return this.registeredHosts.size();
+	}
+
+	@Override
+	public int getNumberOfSlots() {
+		int slots = 0;
+
+		for(Instance instance: registeredHosts.values()){
+			slots += instance.getNumberOfSlots();
+		}
+
+		return slots;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DummyInstance.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DummyInstance.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DummyInstance.java
index 4e0f004..56f44c6 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DummyInstance.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/DummyInstance.java
@@ -14,32 +14,30 @@
 package eu.stratosphere.nephele.instance;
 
 /**
- * A DummyInstance is a stub implementation of the {@link AbstractInstance} interface.
+ * A DummyInstance is a stub implementation of the {@link Instance} interface.
  * Dummy instances are used to plan a job execution but must be replaced with
  * concrete instances before the job execution starts.
  * 
  */
-public class DummyInstance extends AbstractInstance {
+public class DummyInstance extends Instance {
 
 	private static int nextID = 0;
 
 	private final String name;
 
-	public static synchronized DummyInstance createDummyInstance(InstanceType type) {
+	public static synchronized DummyInstance createDummyInstance() {
 
-		return new DummyInstance(type, nextID++);
+		return new DummyInstance(nextID++);
 	}
 
 	/**
 	 * Constructs a new dummy instance of the given instance type.
 	 * 
-	 * @param type
-	 *        the type of the new dummy instance
 	 * @param id
 	 *        the ID of the dummy instance
 	 */
-	private DummyInstance(InstanceType type, int id) {
-		super(type, null, null, null, null);
+	private DummyInstance(int id) {
+		super(null, null, null, null, 0);
 
 		this.name = "DummyInstance_" + Integer.toString(id);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Hardware.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Hardware.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Hardware.java
new file mode 100644
index 0000000..398a2a8
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Hardware.java
@@ -0,0 +1,24 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.instance;
+
+/**
+ * Convenience class to extract hardware specifics of the computer executing this class
+ */
+public class Hardware {
+
+	public static int getNumberCPUCores() {
+		return Runtime.getRuntime().availableProcessors();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Instance.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Instance.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Instance.java
new file mode 100644
index 0000000..fa17745
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/Instance.java
@@ -0,0 +1,362 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.instance;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.Collection;
+
+import eu.stratosphere.nephele.deployment.TaskDeploymentDescriptor;
+import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
+import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileRequest;
+import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
+import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
+import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
+import eu.stratosphere.nephele.ipc.RPC;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.nephele.net.NetUtils;
+import eu.stratosphere.nephele.protocols.TaskOperationProtocol;
+import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
+import eu.stratosphere.nephele.taskmanager.TaskKillResult;
+import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
+import eu.stratosphere.nephele.topology.NetworkNode;
+import eu.stratosphere.nephele.topology.NetworkTopology;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+
+/**
+ * An instance represents a resource a {@link eu.stratosphere.nephele.taskmanager.TaskManager} runs on.
+ * 
+ */
+public class Instance extends NetworkNode {
+	/**
+	 * The connection info identifying the instance.
+	 */
+	private final InstanceConnectionInfo instanceConnectionInfo;
+
+	/**
+	 * The hardware description as reported by the instance itself.
+	 */
+	private final HardwareDescription hardwareDescription;
+
+	/**
+	 * Number of slots available on the node
+	 */
+	private final int numberOfSlots;
+
+	/**
+	 * Allocated slots on this instance
+	 */
+	private final Map<AllocationID, AllocatedSlot> allocatedSlots = new HashMap<AllocationID, AllocatedSlot>();
+
+	/**
+	 * Stores the RPC stub object for the instance's task manager.
+	 */
+	private TaskOperationProtocol taskManager = null;
+
+	/**
+	 * Time when last heat beat has been received from the task manager running on this instance.
+	 */
+	private long lastReceivedHeartBeat = System.currentTimeMillis();
+
+	/**
+	 * Constructs an abstract instance object.
+	 * 
+	 * @param instanceConnectionInfo
+	 *        the connection info identifying the instance
+	 * @param parentNode
+	 *        the parent node in the network topology
+	 * @param networkTopology
+	 *        the network topology this node is a part of
+	 * @param hardwareDescription
+	 *        the hardware description provided by the instance itself
+	 */
+	public Instance(final InstanceConnectionInfo instanceConnectionInfo,
+					final NetworkNode parentNode, final NetworkTopology networkTopology,
+					final HardwareDescription hardwareDescription, int numberOfSlots) {
+		super((instanceConnectionInfo == null) ? null : instanceConnectionInfo.toString(), parentNode, networkTopology);
+		this.instanceConnectionInfo = instanceConnectionInfo;
+		this.hardwareDescription = hardwareDescription;
+		this.numberOfSlots = numberOfSlots;
+	}
+
+	/**
+	 * Creates or returns the RPC stub object for the instance's task manager.
+	 * 
+	 * @return the RPC stub object for the instance's task manager
+	 * @throws IOException
+	 *         thrown if the RPC stub object for the task manager cannot be created
+	 */
+	private TaskOperationProtocol getTaskManagerProxy() throws IOException {
+
+		if (this.taskManager == null) {
+
+			this.taskManager = RPC.getProxy(TaskOperationProtocol.class,
+				new InetSocketAddress(getInstanceConnectionInfo().address(),
+					getInstanceConnectionInfo().ipcPort()), NetUtils.getSocketFactory());
+		}
+
+		return this.taskManager;
+	}
+
+	/**
+	 * Destroys and removes the RPC stub object for this instance's task manager.
+	 */
+	private void destroyTaskManagerProxy() {
+
+		if (this.taskManager != null) {
+			RPC.stopProxy(this.taskManager);
+			this.taskManager = null;
+		}
+	}
+
+	/**
+	 * Returns the instance's connection information object.
+	 * 
+	 * @return the instance's connection information object
+	 */
+	public final InstanceConnectionInfo getInstanceConnectionInfo() {
+		return this.instanceConnectionInfo;
+	}
+
+	/**
+	 * Returns the instance's hardware description as reported by the instance itself.
+	 * 
+	 * @return the instance's hardware description
+	 */
+	public HardwareDescription getHardwareDescription() {
+		return this.hardwareDescription;
+	}
+
+	/**
+	 * Checks if all the libraries required to run the job with the given
+	 * job ID are available on this instance. Any libary that is missing
+	 * is transferred to the instance as a result of this call.
+	 * 
+	 * @param jobID
+	 *        the ID of the job whose libraries are to be checked for
+	 * @throws IOException
+	 *         thrown if an error occurs while checking for the libraries
+	 */
+	public synchronized void checkLibraryAvailability(final JobID jobID) throws IOException {
+
+		// Now distribute the required libraries for the job
+		String[] requiredLibraries = LibraryCacheManager.getRequiredJarFiles(jobID);
+
+		if (requiredLibraries == null) {
+			throw new IOException("No entry of required libraries for job " + jobID);
+		}
+
+		LibraryCacheProfileRequest request = new LibraryCacheProfileRequest();
+		request.setRequiredLibraries(requiredLibraries);
+
+		// Send the request
+		LibraryCacheProfileResponse response = null;
+		response = getTaskManagerProxy().getLibraryCacheProfile(request);
+
+		// Check response and transfer libraries if necessary
+		for (int k = 0; k < requiredLibraries.length; k++) {
+			if (!response.isCached(k)) {
+				LibraryCacheUpdate update = new LibraryCacheUpdate(requiredLibraries[k]);
+				getTaskManagerProxy().updateLibraryCache(update);
+			}
+		}
+	}
+
+	/**
+	 * Submits a list of tasks to the instance's {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
+	 * 
+	 * @param tasks
+	 *        the list of tasks to be submitted
+	 * @return the result of the submission attempt
+	 * @throws IOException
+	 *         thrown if an error occurs while transmitting the task
+	 */
+	public synchronized List<TaskSubmissionResult> submitTasks(final List<TaskDeploymentDescriptor> tasks)
+			throws IOException {
+
+		return getTaskManagerProxy().submitTasks(tasks);
+	}
+
+	/**
+	 * Cancels the task identified by the given ID at the instance's
+	 * {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
+	 * 
+	 * @param id
+	 *        the ID identifying the task to be canceled
+	 * @throws IOException
+	 *         thrown if an error occurs while transmitting the request or receiving the response
+	 * @return the result of the cancel attempt
+	 */
+	public synchronized TaskCancelResult cancelTask(final ExecutionVertexID id) throws IOException {
+
+		return getTaskManagerProxy().cancelTask(id);
+	}
+
+	/**
+	 * Kills the task identified by the given ID at the instance's
+	 * {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
+	 * 
+	 * @param id
+	 *        the ID identifying the task to be killed
+	 * @throws IOException
+	 *         thrown if an error occurs while transmitting the request or receiving the response
+	 * @return the result of the kill attempt
+	 */
+	public synchronized TaskKillResult killTask(final ExecutionVertexID id) throws IOException {
+
+		return getTaskManagerProxy().killTask(id);
+	}
+
+	/**
+	 * Updates the time of last received heart beat to the current system time.
+	 */
+	public synchronized void reportHeartBeat() {
+		this.lastReceivedHeartBeat = System.currentTimeMillis();
+	}
+
+	/**
+	 * Returns whether the host is still alive.
+	 *
+	 * @param cleanUpInterval
+	 *        duration (in milliseconds) after which a host is
+	 *        considered dead if it has no received heat-beats.
+	 * @return <code>true</code> if the host has received a heat-beat before the <code>cleanUpInterval</code> duration
+	 *         has expired, <code>false</code> otherwise
+	 */
+	public synchronized boolean isStillAlive(final long cleanUpInterval) {
+
+		if (this.lastReceivedHeartBeat + cleanUpInterval < System.currentTimeMillis()) {
+			return false;
+		}
+		return true;
+	}
+
+
+	@Override
+	public boolean equals(final Object obj) {
+
+		// Fall back since dummy instances do not have a instanceConnectionInfo
+		if (this.instanceConnectionInfo == null) {
+			return super.equals(obj);
+		}
+
+		if (!(obj instanceof Instance)) {
+			return false;
+		}
+
+		final Instance abstractInstance = (Instance) obj;
+
+		return this.instanceConnectionInfo.equals(abstractInstance.getInstanceConnectionInfo());
+	}
+
+
+	@Override
+	public int hashCode() {
+
+		// Fall back since dummy instances do not have a instanceConnectionInfo
+		if (this.instanceConnectionInfo == null) {
+			return super.hashCode();
+		}
+
+		return this.instanceConnectionInfo.hashCode();
+	}
+
+	/**
+	 * Triggers the remote task manager to print out the current utilization of its read and write buffers to its logs.
+	 * 
+	 * @throws IOException
+	 *         thrown if an error occurs while transmitting the request
+	 */
+	public synchronized void logBufferUtilization() throws IOException {
+
+		getTaskManagerProxy().logBufferUtilization();
+	}
+
+	/**
+	 * Kills the task manager running on this instance. This method is mainly intended to test and debug Nephele's fault
+	 * tolerance mechanisms.
+	 * 
+	 * @throws IOException
+	 *         thrown if an error occurs while transmitting the request
+	 */
+	public synchronized void killTaskManager() throws IOException {
+
+		getTaskManagerProxy().killTaskManager();
+	}
+
+	/**
+	 * Invalidates the entries identified by the given channel IDs from the remote task manager's receiver lookup cache.
+	 * 
+	 * @param channelIDs
+	 *        the channel IDs identifying the cache entries to invalidate
+	 * @throws IOException
+	 *         thrown if an error occurs during this remote procedure call
+	 */
+	public synchronized void invalidateLookupCacheEntries(final Set<ChannelID> channelIDs) throws IOException {
+		getTaskManagerProxy().invalidateLookupCacheEntries(channelIDs);
+	}
+
+	/**
+	 * Destroys all RPC stub objects attached to this instance.
+	 */
+	public synchronized void destroyProxies() {
+
+		destroyTaskManagerProxy();
+
+	}
+
+	public int getNumberOfSlots() {
+		return numberOfSlots;
+	}
+
+	public int getNumberOfAvailableSlots() { return numberOfSlots - allocatedSlots.size(); }
+
+	public synchronized AllocatedResource allocateSlot(JobID jobID) throws InstanceException{
+		if(allocatedSlots.size() < numberOfSlots){
+			AllocatedSlot slot = new AllocatedSlot(jobID);
+
+			allocatedSlots.put(slot.getAllocationID(), slot);
+			return new AllocatedResource(this,slot.getAllocationID());
+		}else{
+			throw new InstanceException("Overbooking instance " + instanceConnectionInfo + ".");
+		}
+	}
+
+	public synchronized void releaseSlot(AllocationID allocationID) {
+		if(allocatedSlots.containsKey(allocationID)){
+			allocatedSlots.remove(allocationID);
+		}else{
+			throw new RuntimeException("There is no slot registered with allocation ID " + allocationID + ".");
+		}
+	}
+
+	public Collection<AllocatedSlot> getAllocatedSlots() {
+		return allocatedSlots.values();
+	}
+
+	public Collection<AllocatedSlot> removeAllocatedSlots() {
+		Collection<AllocatedSlot> slots = new ArrayList<AllocatedSlot>(this.allocatedSlots.values());
+
+		for(AllocatedSlot slot : slots){
+			releaseSlot(slot.getAllocationID());
+		}
+
+		return slots;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceManager.java
index a1015b5..00795f4 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceManager.java
@@ -13,157 +13,32 @@
 
 package eu.stratosphere.nephele.instance;
 
-import java.util.List;
-import java.util.Map;
 
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.topology.NetworkTopology;
 
-/**
- * In Nephele an instance manager maintains the set of available compute resources. It is responsible for allocating new
- * compute resources,
- * provisioning available compute resources to the JobManager and keeping track of the availability of the utilized
- * compute resources in order
- * to report unexpected resource outages.
- * 
- */
 public interface InstanceManager {
 
-	/**
-	 * Requests an instance of the provided instance type from the instance manager.
-	 * 
-	 * @param jobID
-	 *        the ID of the job this instance is requested for
-	 * @param conf
-	 *        a configuration object including additional request information (e.g. credentials)
-	 * @param instanceRequestMap
-	 *        a map specifying the instances requested by this call
-	 * @param count
-	 *        the number of instances
-	 * @throws InstanceException
-	 *         thrown if an error occurs during the instance request
-	 */
-	void requestInstance(JobID jobID, Configuration conf, InstanceRequestMap instanceRequestMap,
-			List<String> splitAffinityList) throws InstanceException;
 
-	/**
-	 * Releases an allocated resource from a job.
-	 * 
-	 * @param jobID
-	 *        the ID of the job the instance has been used for
-	 * @param conf
-	 *        a configuration object including additional release information (e.g. credentials)
-	 * @param allocatedResource
-	 *        the allocated resource to be released
-	 * @throws InstanceException
-	 *         thrown if an error occurs during the release process
-	 */
-	void releaseAllocatedResource(JobID jobID, Configuration conf, AllocatedResource allocatedResource)
-			throws InstanceException;
-
-	/**
-	 * Suggests a suitable instance type according to the provided hardware characteristics.
-	 * 
-	 * @param minNumComputeUnits
-	 *        the minimum number of compute units
-	 * @param minNumCPUCores
-	 *        the minimum number of CPU cores
-	 * @param minMemorySize
-	 *        the minimum number of main memory (in MB)
-	 * @param minDiskCapacity
-	 *        the minimum hard disk capacity (in GB)
-	 * @param maxPricePerHour
-	 *        the maximum price per hour for the instance
-	 * @return the instance type matching the requested hardware profile best or <code>null</code> if no such instance
-	 *         type is available
-	 */
-	InstanceType getSuitableInstanceType(int minNumComputeUnits, int minNumCPUCores, int minMemorySize,
-			int minDiskCapacity, int maxPricePerHour);
+	void shutdown();
 
-	/**
-	 * Reports a heart beat message of an instance.
-	 * 
-	 * @param instanceConnectionInfo
-	 *        the {@link InstanceConnectionInfo} object attached to the heart beat message
-	 * @param hardwareDescription
-	 *        a hardware description with details on the instance's compute resources.
-	 */
-	void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription);
+	void releaseAllocatedResource(AllocatedResource allocatedResource) throws InstanceException;
 
-	/**
-	 * Translates the name of an instance type to the corresponding instance type object.
-	 * 
-	 * @param instanceTypeName
-	 *        the name of the instance type
-	 * @return the instance type object matching the name or <code>null</code> if no such instance type exists
-	 */
-	InstanceType getInstanceTypeByName(String instanceTypeName);
+	void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo);
 
-	/**
-	 * Returns the default instance type used by the instance manager.
-	 * 
-	 * @return the default instance type
-	 */
-	InstanceType getDefaultInstanceType();
+	void registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
+									HardwareDescription hardwareDescription, int numberOfSlots);
+	void requestInstance(JobID jobID, Configuration conf,  int requiredSlots)
+			throws InstanceException;
 
-	/**
-	 * Returns the network topology for the job with the given ID. The network topology
-	 * for the job might only be an excerpt of the overall network topology. It only
-	 * includes those instances as leaf nodes which are really allocated for the
-	 * execution of the job.
-	 * 
-	 * @param jobID
-	 *        the ID of the job to get the topology for
-	 * @return the network topology for the job
-	 */
 	NetworkTopology getNetworkTopology(JobID jobID);
 
-	/**
-	 * Sets the {@link InstanceListener} object which is supposed to be
-	 * notified about instance availability and deaths.
-	 * 
-	 * @param instanceListener
-	 *        the instance listener to set for this instance manager
-	 */
 	void setInstanceListener(InstanceListener instanceListener);
 
-	/**
-	 * Returns a map of all instance types which are currently available to Nephele. The map contains a description of
-	 * the hardware characteristics for each instance type as provided in the configuration file. Moreover, it contains
-	 * the actual hardware description as reported by task managers running on the individual instances. If available,
-	 * the map also contains the maximum number instances Nephele can allocate of each instance type (i.e. if no other
-	 * job occupies instances).
-	 * 
-	 * @return a list of all instance types available to Nephele
-	 */
-	Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes();
-
-	/**
-	 * Returns the {@link AbstractInstance} with the given name.
-	 * 
-	 * @param name
-	 *        the name of the instance
-	 * @return the instance with the given name or <code>null</code> if no such instance could be found
-	 */
-	AbstractInstance getInstanceByName(String name);
+	Instance getInstanceByName(String name);
 
-	/**
-	 * Cancels all pending instance requests that might still exist for the job with the given ID.
-	 * 
-	 * @param jobID
-	 *        the ID of the job to cancel the pending instance requests for
-	 */
-	void cancelPendingRequests(JobID jobID);
-
-	/**
-	 * Shuts the instance manager down and stops all its internal processes.
-	 */
-	void shutdown();
-
-	/**
-	 * 
-	 * @return the number of available (registered) TaskTrackers
-	 */
 	int getNumberOfTaskTrackers();
+
+	int getNumberOfSlots();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceNotifier.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceNotifier.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceNotifier.java
new file mode 100644
index 0000000..2df3d3d
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceNotifier.java
@@ -0,0 +1,71 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.instance;
+
+import java.util.List;
+
+import eu.stratosphere.nephele.instance.AllocatedResource;
+import eu.stratosphere.nephele.instance.InstanceListener;
+import eu.stratosphere.nephele.jobgraph.JobID;
+
+/**
+ * This class is an auxiliary class to send the notification
+ * about the availability of an {@link eu.stratosphere.nephele.instance.Instance} to the given {@link
+ * InstanceListener} object. The notification must be sent from
+ * a separate thread, otherwise the atomic operation of requesting an instance
+ * for a vertex and switching to the state ASSIGNING could not be guaranteed.
+ * This class is thread-safe.
+ * 
+ */
+public class InstanceNotifier extends Thread {
+
+	/**
+	 * The {@link InstanceListener} object to send the notification to.
+	 */
+	private final InstanceListener instanceListener;
+
+	/**
+	 * The ID of the job the notification refers to.
+	 */
+	private final JobID jobID;
+
+	/**
+	 * The allocated resources the notification refers to.
+	 */
+	private final List<AllocatedResource> allocatedResources;
+
+	/**
+	 * Constructs a new instance notifier object.
+	 * 
+	 * @param instanceListener
+	 *        the listener to send the notification to
+	 * @param jobID
+	 *        the ID of the job the notification refers to
+	 * @param allocatedResources
+	 *        the resources with has been allocated for the job
+	 */
+	public InstanceNotifier(final InstanceListener instanceListener, final JobID jobID,
+							final List<AllocatedResource> allocatedResources) {
+		this.instanceListener = instanceListener;
+		this.jobID = jobID;
+		this.allocatedResources = allocatedResources;
+	}
+
+
+	@Override
+	public void run() {
+
+		this.instanceListener.resourcesAllocated(this.jobID, this.allocatedResources);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceRequestMap.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceRequestMap.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceRequestMap.java
deleted file mode 100644
index 4167f67..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceRequestMap.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * An instance request map specifies the required types of instances to run a specific job and the respective number
- * thereof. For each instance type it is possible to specify the minimum number of instances required to run the job. If
- * the {@link InstanceManager} cannot manage to provide at least this minimum numbers of instances for the given type,
- * the job will be rejected.
- * <p>
- * In addition, is it also possible to specify the optimal number of instances for a particular instance type. The
- * {@link InstanceManager} will try to provide this optimal number of instances, but will also start the job with less
- * instances.
- * <p>
- * This class is not thread-safe.
- * 
- */
-public final class InstanceRequestMap {
-
-	/**
-	 * The map holding the minimum number of instances to be requested for each instance type.
-	 */
-	private final Map<InstanceType, Integer> minimumMap = new HashMap<InstanceType, Integer>();
-
-	/**
-	 * The map holding the maximum number of instances to be requested for each instance type.
-	 */
-	private final Map<InstanceType, Integer> maximumMap = new HashMap<InstanceType, Integer>();
-
-	/**
-	 * Sets the minimum number of instances to be requested from the given instance type.
-	 * 
-	 * @param instanceType
-	 *        the type of instance to request
-	 * @param number
-	 *        the minimum number of instances to request
-	 */
-	public void setMinimumNumberOfInstances(final InstanceType instanceType, final int number) {
-
-		this.minimumMap.put(instanceType, Integer.valueOf(number));
-	}
-
-	/**
-	 * Sets the maximum number of instances to be requested from the given instance type.
-	 * 
-	 * @param instanceType
-	 *        the type of instance to request
-	 * @param number
-	 *        the maximum number of instances to request
-	 */
-	public void setMaximumNumberOfInstances(final InstanceType instanceType, final int number) {
-
-		this.maximumMap.put(instanceType, Integer.valueOf(number));
-	}
-
-	/**
-	 * Sets both the minimum and the maximum number of instances to be requested from the given instance type.
-	 * 
-	 * @param instanceType
-	 *        the type of instance to request
-	 * @param number
-	 *        the minimum and the maximum number of instances to request
-	 */
-	public void setNumberOfInstances(final InstanceType instanceType, final int number) {
-
-		setMinimumNumberOfInstances(instanceType, number);
-		setMaximumNumberOfInstances(instanceType, number);
-	}
-
-	/**
-	 * Returns the minimum number of instances to be requested from the given instance type.
-	 * 
-	 * @param instanceType
-	 *        the type of instance to request
-	 * @return the minimum number of instances to be requested from the given instance type
-	 */
-	public int getMinimumNumberOfInstances(final InstanceType instanceType) {
-
-		final Integer val = this.minimumMap.get(instanceType);
-		if (val != null) {
-			return val.intValue();
-		}
-
-		return 0;
-	}
-
-	/**
-	 * Returns the maximum number of instances to be requested from the given instance type.
-	 * 
-	 * @param instanceType
-	 *        the type of instance to request
-	 * @return the maximum number of instances to be requested from the given instance type
-	 */
-	public int getMaximumNumberOfInstances(final InstanceType instanceType) {
-
-		final Integer val = this.maximumMap.get(instanceType);
-		if (val != null) {
-			return val.intValue();
-		}
-
-		return 0;
-	}
-
-	/**
-	 * Checks if this instance request map is empty, i.e. neither contains an entry for the minimum or maximum number of
-	 * instances to be requested for any instance type.
-	 * 
-	 * @return <code>true</code> if the map is empty, <code>false</code> otherwise
-	 */
-	public boolean isEmpty() {
-
-		if (!this.maximumMap.isEmpty()) {
-			return false;
-		}
-
-		if (!this.minimumMap.isEmpty()) {
-			return false;
-		}
-
-		return true;
-	}
-
-	/**
-	 * Returns an {@link Iterator} object which allows to traverse the minimum number of instances to be requested for
-	 * each instance type.
-	 * 
-	 * @return an iterator to traverse the minimum number of instances to be requested for each instance type
-	 */
-	public Iterator<Map.Entry<InstanceType, Integer>> getMaximumIterator() {
-
-		return this.maximumMap.entrySet().iterator();
-	}
-
-	/**
-	 * Returns an {@link Iterator} object which allows to traverse the maximum number of instances to be requested for
-	 * each instance type.
-	 * 
-	 * @return an iterator to traverse the maximum number of instances to be requested for each instance type
-	 */
-	public Iterator<Map.Entry<InstanceType, Integer>> getMinimumIterator() {
-
-		return this.minimumMap.entrySet().iterator();
-	}
-
-	/**
-	 * Returns the number of different instance types stored in this request map.
-	 * 
-	 * @return the number of different instance types stored in this request map
-	 */
-	public int size() {
-
-		final int s = this.maximumMap.size();
-
-		if (s != this.minimumMap.size()) {
-			throw new IllegalStateException("InstanceRequestMap is in an inconsistent state");
-		}
-
-		return s;
-	}
-
-	/**
-	 * Clears the instance request map.
-	 */
-	public void clear() {
-
-		this.maximumMap.clear();
-		this.minimumMap.clear();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceType.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceType.java
deleted file mode 100644
index f2bb4e5..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceType.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.core.io.StringRecord;
-
-/**
- * An instance type describes the hardware resources a task manager runs on. According
- * to its type an instance has a specific number of CPU cores, computation units, a certain
- * amount of main memory and disk space. In addition, it has a specific price per hour.
- * 
- */
-public final class InstanceType implements IOReadableWritable {
-
-	/**
-	 * The identifier for this instance type.
-	 */
-	private String identifier;
-
-	/**
-	 * The number of computational units of this instance type.
-	 * A computational unit is a virtual compute capacity. A host with a
-	 * single-core 2 GHz CPU may possess 20 compute units (1*20), while a
-	 * dual-core 2.5 GHz CPU may possess 50 compute units (2*25). The
-	 * specified number of compute units expresses the fraction of the
-	 * CPU capacity promised to a user.
-	 */
-	private int numberOfComputeUnits = 0;
-
-	/**
-	 * The number of CPU cores of this instance type.
-	 */
-	private int numberOfCores = 0;
-
-	/**
-	 * The amount of main memory of this instance type (in MB).
-	 */
-	private int memorySize = 0;
-
-	/**
-	 * The disk capacity of this instance type (in GB).
-	 */
-	private int diskCapacity = 0;
-
-	/**
-	 * The price per hour that is charged for running instances of this type.
-	 */
-	private int pricePerHour = 0;
-
-	/**
-	 * Public constructor required for the serialization process.
-	 */
-	public InstanceType() {
-	}
-
-	/**
-	 * Creates a new instance type.
-	 * 
-	 * @param identifier
-	 *        identifier for this instance type
-	 * @param numberOfComputeUnits
-	 *        number of computational units of this instance type
-	 * @param numberOfCores
-	 *        number of CPU cores of this instance type
-	 * @param memorySize
-	 *        amount of main memory of this instance type (in MB)
-	 * @param diskCapacity
-	 *        disk capacity of this instance type (in GB)
-	 * @param pricePerHour
-	 *        price per hour that is charged for running instances of this type
-	 */
-	InstanceType(final String identifier, final int numberOfComputeUnits, final int numberOfCores,
-			final int memorySize,
-			final int diskCapacity, final int pricePerHour) {
-
-		this.identifier = identifier;
-		this.numberOfComputeUnits = numberOfComputeUnits;
-		this.numberOfCores = numberOfCores;
-		this.memorySize = memorySize;
-		this.diskCapacity = diskCapacity;
-		this.pricePerHour = pricePerHour;
-	}
-
-	/**
-	 * Returns the instance type's number of computational units.
-	 * 
-	 * @return the instance type's number of computational units
-	 */
-	public int getNumberOfComputeUnits() {
-		return this.numberOfComputeUnits;
-	}
-
-	/**
-	 * Returns the instance type's number of CPU cores.
-	 * 
-	 * @return the instance type's number of CPU cores
-	 */
-	public int getNumberOfCores() {
-		return this.numberOfCores;
-	}
-
-	/**
-	 * Returns the instance type's amount of main memory.
-	 * 
-	 * @return the instance type's amount of main memory
-	 */
-	public int getMemorySize() {
-		return this.memorySize;
-	}
-
-	/**
-	 * Returns the instance type's disk capacity.
-	 * 
-	 * @return the instance type's disk capacity
-	 */
-	public int getDiskCapacity() {
-		return this.diskCapacity;
-	}
-
-	/**
-	 * Returns the instance type's price per hour.
-	 * 
-	 * @return the instance type's price per hour
-	 */
-	public int getPricePerHour() {
-		return this.pricePerHour;
-	}
-
-	/**
-	 * Returns the instance type's identifier.
-	 * 
-	 * @return the instance type's identifier
-	 */
-	public String getIdentifier() {
-		return this.identifier;
-	}
-
-
-	@Override
-	public String toString() {
-
-		final StringBuilder bld = new StringBuilder(32);
-		bld.append(this.identifier);
-		bld.append(' ');
-		bld.append('(');
-		bld.append(this.numberOfComputeUnits);
-		bld.append(',');
-		bld.append(this.numberOfCores);
-		bld.append(',');
-		bld.append(this.memorySize);
-		bld.append(',');
-		bld.append(this.diskCapacity);
-		bld.append(',');
-		bld.append(this.pricePerHour);
-		bld.append(')');
-
-		return bld.toString();
-	}
-
-
-	@Override
-	public void write(final DataOutput out) throws IOException {
-
-		StringRecord.writeString(out, this.identifier);
-		out.writeInt(this.numberOfComputeUnits);
-		out.writeInt(this.numberOfCores);
-		out.writeInt(this.memorySize);
-		out.writeInt(this.diskCapacity);
-		out.writeInt(this.pricePerHour);
-	}
-
-
-	@Override
-	public void read(final DataInput in) throws IOException {
-
-		this.identifier = StringRecord.readString(in);
-		this.numberOfComputeUnits = in.readInt();
-		this.numberOfCores = in.readInt();
-		this.memorySize = in.readInt();
-		this.diskCapacity = in.readInt();
-		this.pricePerHour = in.readInt();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescription.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescription.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescription.java
deleted file mode 100644
index ce0a694..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescription.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- * An instance type description provides details of instance type. Is can comprise both the hardware description from
- * the instance type description (as provided by the operator/administrator of the instance) as well as the actual
- * hardware description which has been determined on the compute instance itself.
- * 
- */
-public final class InstanceTypeDescription implements IOReadableWritable {
-
-	/**
-	 * The instance type.
-	 */
-	private InstanceType instanceType = null;
-
-	/**
-	 * The hardware description as created by the {@link InstanceManager}.
-	 */
-	private HardwareDescription hardwareDescription = null;
-
-	/**
-	 * The maximum number of available instances of this type.
-	 */
-	private int maximumNumberOfAvailableInstances = 0;
-
-	/**
-	 * Public default constructor required for serialization process.
-	 */
-	public InstanceTypeDescription() {
-	}
-
-	/**
-	 * Constructs a new instance type description.
-	 * 
-	 * @param instanceType
-	 *        the instance type
-	 * @param hardwareDescription
-	 *        the hardware description as created by the {@link InstanceManager}
-	 * @param maximumNumberOfAvailableInstances
-	 *        the maximum number of available instances of this type
-	 */
-	InstanceTypeDescription(final InstanceType instanceType, final HardwareDescription hardwareDescription,
-			final int maximumNumberOfAvailableInstances) {
-
-		this.instanceType = instanceType;
-		this.hardwareDescription = hardwareDescription;
-		this.maximumNumberOfAvailableInstances = maximumNumberOfAvailableInstances;
-	}
-
-
-	@Override
-	public void write(final DataOutput out) throws IOException {
-
-		if (this.instanceType == null) {
-			out.writeBoolean(false);
-		} else {
-			out.writeBoolean(true);
-			this.instanceType.write(out);
-		}
-
-		if (this.hardwareDescription == null) {
-			out.writeBoolean(false);
-		} else {
-			out.writeBoolean(true);
-			this.hardwareDescription.write(out);
-		}
-
-		out.writeInt(this.maximumNumberOfAvailableInstances);
-	}
-
-
-	@Override
-	public void read(final DataInput in) throws IOException {
-
-		if (in.readBoolean()) {
-			this.instanceType = new InstanceType();
-			this.instanceType.read(in);
-		} else {
-			this.instanceType = null;
-		}
-
-		if (in.readBoolean()) {
-			this.hardwareDescription = new HardwareDescription();
-			this.hardwareDescription.read(in);
-		}
-
-		this.maximumNumberOfAvailableInstances = in.readInt();
-	}
-
-	/**
-	 * Returns the hardware description as created by the {@link InstanceManager}.
-	 * 
-	 * @return the instance's hardware description or <code>null</code> if no description is available
-	 */
-	public HardwareDescription getHardwareDescription() {
-		return this.hardwareDescription;
-	}
-
-	/**
-	 * Returns the instance type as determined by the {@link InstanceManager}.
-	 * 
-	 * @return the instance type
-	 */
-	public InstanceType getInstanceType() {
-		return this.instanceType;
-	}
-
-	/**
-	 * Returns the maximum number of instances the {@link InstanceManager} can at most allocate of this instance type
-	 * (i.e. when no other jobs are occupying any resources).
-	 * 
-	 * @return the maximum number of instances of this type or <code>-1</code> if the number is unknown to the
-	 *         {@link InstanceManager}
-	 */
-	public int getMaximumNumberOfAvailableInstances() {
-		return this.maximumNumberOfAvailableInstances;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescriptionFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescriptionFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescriptionFactory.java
deleted file mode 100644
index 2b3e7db..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescriptionFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance;
-
-/**
- * This factory produces {@link InstanceTypeDescription} objects.
- * <p>
- * This class is thread-safe.
- * 
- */
-public class InstanceTypeDescriptionFactory {
-
-	/**
-	 * Private constructor, so class cannot be instantiated.
-	 */
-	private InstanceTypeDescriptionFactory() {
-	}
-
-	/**
-	 * Constructs a new {@link InstaceTypeDescription} object.
-	 * 
-	 * @param instanceType
-	 *        the instance type
-	 * @param hardwareDescription
-	 *        the hardware description as created by the {@link InstanceManager}
-	 * @param numberOfAvailableInstances
-	 *        the number of available instances of this type
-	 * @return the instance type description
-	 */
-	public static InstanceTypeDescription construct(InstanceType instanceType, HardwareDescription hardwareDescription,
-			int numberOfAvailableInstances) {
-
-		return new InstanceTypeDescription(instanceType, hardwareDescription, numberOfAvailableInstances);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeFactory.java
deleted file mode 100644
index ff501c4..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeFactory.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * This factory constructs {@link InstanceType} objects.
- * 
- */
-public class InstanceTypeFactory {
-
-	/**
-	 * The logger used to report errors.
-	 */
-	private static final Log LOG = LogFactory.getLog(InstanceTypeFactory.class);
-
-	/**
-	 * The pattern used to parse the hardware descriptions of instance types.
-	 */
-	private static Pattern INSTANCE_TYPE_PATTERN = Pattern.compile("^([^,]+),(\\d+),(\\d+),(\\d+),(\\d+),(\\d+)$");
-
-	/**
-	 * Private constructor, so class cannot be instantiated.
-	 */
-	private InstanceTypeFactory() {
-	}
-
-	/**
-	 * Constructs an {@link InstanceType} object by parsing a hardware description string.
-	 * 
-	 * @param description
-	 *        the hardware description reflected by this instance type
-	 * @return an instance type reflecting the given hardware description or <code>null</code> if the description cannot
-	 *         be parsed
-	 */
-	public static InstanceType constructFromDescription(String description) {
-
-		final Matcher m = INSTANCE_TYPE_PATTERN.matcher(description);
-		if (!m.matches()) {
-			LOG.error("Cannot extract instance type from string " + description);
-			return null;
-		}
-
-		final String identifier = m.group(1);
-		final int numComputeUnits = Integer.parseInt(m.group(2));
-		final int numCores = Integer.parseInt(m.group(3));
-		final int memorySize = Integer.parseInt(m.group(4));
-		final int diskCapacity = Integer.parseInt(m.group(5));
-		final int pricePerHour = Integer.parseInt(m.group(6));
-
-		return new InstanceType(identifier, numComputeUnits, numCores, memorySize, diskCapacity, pricePerHour);
-	}
-
-	/**
-	 * Constructs an {@link InstanceType} from the given parameters.
-	 * 
-	 * @param identifier
-	 *        identifier for this instance type
-	 * @param numberOfComputeUnits
-	 *        number of computational units of this instance type
-	 * @param numberOfCores
-	 *        number of CPU cores of this instance type
-	 * @param memorySize
-	 *        amount of main memory of this instance type (in MB)
-	 * @param diskCapacity
-	 *        disk capacity of this instance type (in GB)
-	 * @param pricePerHour
-	 *        price per hour that is charged for running instances of this type
-	 */
-	public static InstanceType construct(String identifier, int numberOfComputeUnits, int numberOfCores,
-			int memorySize, int diskCapacity, int pricePerHour) {
-
-		return new InstanceType(identifier, numberOfComputeUnits, numberOfCores, memorySize, diskCapacity, pricePerHour);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/LocalInstanceManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/LocalInstanceManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/LocalInstanceManager.java
new file mode 100644
index 0000000..1576649
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/LocalInstanceManager.java
@@ -0,0 +1,60 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.instance;
+
+
+import eu.stratosphere.configuration.ConfigConstants;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.configuration.GlobalConfiguration;
+import eu.stratosphere.nephele.ExecutionMode;
+import eu.stratosphere.nephele.taskmanager.TaskManager;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LocalInstanceManager extends DefaultInstanceManager {
+	
+	private List<TaskManager> taskManagers = new ArrayList<TaskManager>();
+
+	public LocalInstanceManager() throws Exception{
+		int numTaskManager = GlobalConfiguration.getInteger(ConfigConstants
+				.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
+
+		ExecutionMode execMode = numTaskManager == 1 ? ExecutionMode.LOCAL : ExecutionMode.CLUSTER;
+		
+		for (int i=0; i < numTaskManager; i++){
+			Configuration tm = new Configuration();
+			int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
+					ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT);
+			int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+					ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
+
+			tm.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ipcPort + i);
+			tm.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + i);
+
+			GlobalConfiguration.includeConfiguration(tm);
+
+			taskManagers.add(new TaskManager(execMode));
+		}
+	}
+
+	@Override
+	public void shutdown(){
+		for(TaskManager taskManager: taskManagers){
+			taskManager.shutdown();
+		}
+
+		super.shutdown();
+	}
+}