You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/22 23:47:29 UTC

[08/22] Rework the Taskmanager to a slot based model and remove legacy cloud code

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/AllocatedSlice.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/AllocatedSlice.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/AllocatedSlice.java
deleted file mode 100644
index 88d71e0..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/AllocatedSlice.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.cluster;
-
-import eu.stratosphere.nephele.instance.AllocationID;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * An allocated slice is a part of an instance which is assigned to a job.
- * <p>
- * This class is thread-safe.
- * 
- */
-class AllocatedSlice {
-
-	/**
-	 * The allocation ID which identifies the resources occupied by this slice.
-	 */
-	private final AllocationID allocationID;
-
-	/**
-	 * The machine hosting the slice.
-	 */
-	private final ClusterInstance hostingInstance;
-
-	/**
-	 * The type describing the characteristics of the allocated slice.
-	 */
-	private final InstanceType type;
-
-	/**
-	 * The ID of the job this slice belongs to.
-	 */
-	private final JobID jobID;
-
-	/**
-	 * Time when this machine has been allocation in milliseconds, {@see currentTimeMillis()}.
-	 */
-	private final long allocationTime;
-
-	/**
-	 * Creates a new allocated slice on the given hosting instance.
-	 * 
-	 * @param hostingInstance
-	 *        the instance hosting the slice
-	 * @param type
-	 *        the type describing the characteristics of the allocated slice
-	 * @param jobID
-	 *        the ID of the job this slice belongs to
-	 * @param allocationTime
-	 *        the time the instance was allocated
-	 */
-	public AllocatedSlice(final ClusterInstance hostingInstance, final InstanceType type, final JobID jobID,
-			final long allocationTime) {
-
-		this.allocationID = new AllocationID();
-		this.hostingInstance = hostingInstance;
-		this.type = type;
-		this.jobID = jobID;
-		this.allocationTime = allocationTime;
-	}
-
-	/**
-	 * Returns the allocation ID of this slice.
-	 * 
-	 * @return the allocation ID of this slice
-	 */
-	public AllocationID getAllocationID() {
-		return this.allocationID;
-	}
-
-	/**
-	 * The type describing the characteristics of
-	 * this allocated slice.
-	 * 
-	 * @return the type describing the characteristics of the slice
-	 */
-	public InstanceType getType() {
-		return this.type;
-	}
-
-	/**
-	 * Returns the time the instance was allocated.
-	 * 
-	 * @return the time the instance was allocated
-	 */
-	public long getAllocationTime() {
-		return this.allocationTime;
-	}
-
-	/**
-	 * Returns the ID of the job this allocated slice belongs to.
-	 * 
-	 * @return the ID of the job this allocated slice belongs to
-	 */
-	public JobID getJobID() {
-		return this.jobID;
-	}
-
-	/**
-	 * Returns the instance hosting this slice.
-	 * 
-	 * @return the instance hosting this slice
-	 */
-	public ClusterInstance getHostingInstance() {
-		return this.hostingInstance;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstance.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstance.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstance.java
deleted file mode 100644
index 5c50bd3..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstance.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.cluster;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.AllocationID;
-import eu.stratosphere.nephele.instance.HardwareDescription;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeFactory;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.topology.NetworkNode;
-import eu.stratosphere.nephele.topology.NetworkTopology;
-
-/**
- * Representation of a host of a compute cluster.
- * <p>
- * This class is thread-safe.
- * 
- */
-class ClusterInstance extends AbstractInstance {
-
-	/**
-	 * A map of slices allocated on this host.
-	 */
-	private final Map<AllocationID, AllocatedSlice> allocatedSlices = new HashMap<AllocationID, AllocatedSlice>();
-
-	/**
-	 * The remaining capacity of this host that can be used by instances.
-	 */
-	private InstanceType remainingCapacity;
-
-	/**
-	 * Time when last heat beat has been received from the task manager running on this instance.
-	 */
-	private long lastReceivedHeartBeat = System.currentTimeMillis();
-
-	/**
-	 * Constructs a new cluster instance.
-	 * 
-	 * @param instanceConnectionInfo
-	 *        the instance connection info identifying the host
-	 * @param capacity
-	 *        capacity of this host
-	 * @param parentNode
-	 *        the parent node of this node in the network topology
-	 * @param networkTopology
-	 *        the network topology this node is part of
-	 * @param hardwareDescription
-	 *        the hardware description reported by the instance itself
-	 */
-	public ClusterInstance(final InstanceConnectionInfo instanceConnectionInfo, final InstanceType capacity,
-			final NetworkNode parentNode, final NetworkTopology networkTopology,
-			final HardwareDescription hardwareDescription) {
-
-		super(capacity, instanceConnectionInfo, parentNode, networkTopology, hardwareDescription);
-
-		this.remainingCapacity = capacity;
-	}
-
-	/**
-	 * Updates the time of last received heart beat to the current system time.
-	 */
-	synchronized void reportHeartBeat() {
-		this.lastReceivedHeartBeat = System.currentTimeMillis();
-	}
-
-	/**
-	 * Returns whether the host is still alive.
-	 * 
-	 * @param cleanUpInterval
-	 *        duration (in milliseconds) after which a host is
-	 *        considered dead if it has no received heat-beats.
-	 * @return <code>true</code> if the host has received a heat-beat before the <code>cleanUpInterval</code> duration
-	 *         has expired, <code>false</code> otherwise
-	 */
-	synchronized boolean isStillAlive(final long cleanUpInterval) {
-
-		if (this.lastReceivedHeartBeat + cleanUpInterval < System.currentTimeMillis()) {
-			return false;
-		}
-		return true;
-	}
-
-	/**
-	 * Tries to create a new slice on this instance.
-	 * 
-	 * @param reqType
-	 *        the type describing the hardware characteristics of the slice
-	 * @param jobID
-	 *        the ID of the job the new slice belongs to
-	 * @return a new {@AllocatedSlice} object if a slice with the given hardware characteristics could
-	 *         still be accommodated on this instance or <code>null</code> if the instance's remaining resources
-	 *         were insufficient to host the desired slice
-	 */
-	synchronized AllocatedSlice createSlice(final InstanceType reqType, final JobID jobID) {
-
-		// check whether we can accommodate the instance
-		if (remainingCapacity.getNumberOfComputeUnits() >= reqType.getNumberOfComputeUnits()
-			&& remainingCapacity.getNumberOfCores() >= reqType.getNumberOfCores()
-			&& remainingCapacity.getMemorySize() >= reqType.getMemorySize()
-			&& remainingCapacity.getDiskCapacity() >= reqType.getDiskCapacity()) {
-
-			// reduce available capacity by what has been requested
-			remainingCapacity = InstanceTypeFactory.construct(remainingCapacity.getIdentifier(), remainingCapacity
-				.getNumberOfComputeUnits()
-				- reqType.getNumberOfComputeUnits(), remainingCapacity.getNumberOfCores() - reqType.getNumberOfCores(),
-				remainingCapacity.getMemorySize() - reqType.getMemorySize(), remainingCapacity.getDiskCapacity()
-					- reqType.getDiskCapacity(), remainingCapacity.getPricePerHour());
-
-			final long allocationTime = System.currentTimeMillis();
-
-			final AllocatedSlice slice = new AllocatedSlice(this, reqType, jobID, allocationTime);
-			this.allocatedSlices.put(slice.getAllocationID(), slice);
-			return slice;
-		}
-
-		// we cannot accommodate the instance
-		return null;
-	}
-
-	/**
-	 * Removes the slice identified by the given allocation ID from
-	 * this instance and frees up the allocated resources.
-	 * 
-	 * @param allocationID
-	 *        the allocation ID of the slice to be removed
-	 * @return the slice with has been removed from the instance or <code>null</code> if no slice
-	 *         with the given allocation ID could be found
-	 */
-	synchronized AllocatedSlice removeAllocatedSlice(final AllocationID allocationID) {
-
-		final AllocatedSlice slice = this.allocatedSlices.remove(allocationID);
-		if (slice != null) {
-
-			this.remainingCapacity = InstanceTypeFactory.construct(this.remainingCapacity.getIdentifier(),
-				this.remainingCapacity
-					.getNumberOfComputeUnits()
-					+ slice.getType().getNumberOfComputeUnits(), this.remainingCapacity.getNumberOfCores()
-					+ slice.getType().getNumberOfCores(), this.remainingCapacity.getMemorySize()
-					+ slice.getType().getMemorySize(), this.remainingCapacity.getDiskCapacity()
-					+ slice.getType().getDiskCapacity(), this.remainingCapacity.getPricePerHour());
-		}
-
-		return slice;
-	}
-
-	/**
-	 * Removes all allocated slices on this instance and frees
-	 * up their allocated resources.
-	 * 
-	 * @return a list of all removed slices
-	 */
-	synchronized List<AllocatedSlice> removeAllAllocatedSlices() {
-
-		final List<AllocatedSlice> slices = new ArrayList<AllocatedSlice>(this.allocatedSlices.values());
-		final Iterator<AllocatedSlice> it = slices.iterator();
-		while (it.hasNext()) {
-			removeAllocatedSlice(it.next().getAllocationID());
-		}
-
-		return slices;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstanceNotifier.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstanceNotifier.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstanceNotifier.java
deleted file mode 100644
index 39d2132..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstanceNotifier.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.cluster;
-
-import java.util.List;
-
-import eu.stratosphere.nephele.instance.AllocatedResource;
-import eu.stratosphere.nephele.instance.InstanceListener;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * This class is an auxiliary class to send the notification
- * about the availability of an {@link AbstractInstance} to the given {@link InstanceListener} object. The notification
- * must be sent from
- * a separate thread, otherwise the atomic operation of requesting an instance
- * for a vertex and switching to the state ASSINING could not be guaranteed.
- * This class is thread-safe.
- * 
- */
-public class ClusterInstanceNotifier extends Thread {
-
-	/**
-	 * The {@link InstanceListener} object to send the notification to.
-	 */
-	private final InstanceListener instanceListener;
-
-	/**
-	 * The ID of the job the notification refers to.
-	 */
-	private final JobID jobID;
-
-	/**
-	 * The allocated resources the notification refers to.
-	 */
-	private final List<AllocatedResource> allocatedResources;
-
-	/**
-	 * Constructs a new instance notifier object.
-	 * 
-	 * @param instanceListener
-	 *        the listener to send the notification to
-	 * @param jobID
-	 *        the ID of the job the notification refers to
-	 * @param allocatedResources
-	 *        the resources with has been allocated for the job
-	 */
-	public ClusterInstanceNotifier(final InstanceListener instanceListener, final JobID jobID,
-			final List<AllocatedResource> allocatedResources) {
-		this.instanceListener = instanceListener;
-		this.jobID = jobID;
-		this.allocatedResources = allocatedResources;
-	}
-
-
-	@Override
-	public void run() {
-
-		this.instanceListener.resourcesAllocated(this.jobID, this.allocatedResources);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java
deleted file mode 100644
index 480e521..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java
+++ /dev/null
@@ -1,945 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.cluster;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.configuration.ConfigConstants;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.AllocatedResource;
-import eu.stratosphere.nephele.instance.HardwareDescription;
-import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.instance.InstanceException;
-import eu.stratosphere.nephele.instance.InstanceListener;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.instance.InstanceRequestMap;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.instance.InstanceTypeDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceTypeFactory;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.topology.NetworkNode;
-import eu.stratosphere.nephele.topology.NetworkTopology;
-import eu.stratosphere.nephele.util.SerializableHashMap;
-
-/**
- * Instance Manager for a static cluster.
- * <p>
- * The cluster manager can handle heterogeneous instances (compute nodes). Each instance type used in the cluster must
- * be described in the configuration.
- * <p>
- * This is a sample configuration: <code>
- * # definition of instances in format
- * # instancename,numComputeUnits,numCores,memorySize,diskCapacity,pricePerHour
- * instancemanager.cluster.type.1 = m1.small,2,1,2048,10,10
- * instancemanager.cluster.type. = c1.medium,2,1,2048,10,10
- * instancemanager.cluster.type. = m1.large,4,2,2048,10,10
- * instancemanager.cluster.type. = m1.xlarge,8,4,8192,20,20
- * instancemanager.cluster.type. = c1.xlarge,8,4,16384,20,40
- * 
- * # default instance type
- * instancemanager.cluster.defaulttype = 1 (pointing to m1.small)
- * </code> Each instance is expected to run exactly one {@link eu.stratosphere.nephele.taskmanager.TaskManager}. When
- * the {@link eu.stratosphere.nephele.taskmanager.TaskManager} registers with the
- * {@link eu.stratosphere.nephele.jobmanager.JobManager} it sends a {@link HardwareDescription} which describes the
- * actual hardware characteristics of the instance (compute node). The cluster manage will attempt to match the report
- * hardware characteristics with one of the configured instance types. Moreover, the cluster manager is capable of
- * partitioning larger instances (compute nodes) into smaller, less powerful instances.
- */
-public class ClusterManager implements InstanceManager {
-
-	// ------------------------------------------------------------------------
-	// Internal Constants
-	// ------------------------------------------------------------------------
-
-	/**
-	 * The log object used to report debugging and error information.
-	 */
-	private static final Log LOG = LogFactory.getLog(ClusterManager.class);
-
-	/**
-	 * Default duration after which a host is purged in case it did not send
-	 * a heart-beat message.
-	 */
-	private static final int DEFAULT_CLEANUP_INTERVAL = 2 * 60; // 2 min.
-
-	/**
-	 * The key to retrieve the clean up interval from the configuration.
-	 */
-	private static final String CLEANUP_INTERVAL_KEY = "instancemanager.cluster.cleanupinterval";
-
-	// ------------------------------------------------------------------------
-	// Fields
-	// ------------------------------------------------------------------------
-
-	private final Object lock = new Object();
-	
-	/**
-	 * Duration after which a host is purged in case it did not send a
-	 * heart-beat message.
-	 */
-	private final long cleanUpInterval;
-
-	/**
-	 * The default instance type.
-	 */
-	private final InstanceType defaultInstanceType;
-
-	/**
-	 * Set of hosts known to run a task manager that are thus able to execute
-	 * tasks.
-	 */
-	private final Map<InstanceConnectionInfo, ClusterInstance> registeredHosts;
-
-	/**
-	 * Map of a {@link JobID} to all {@link AllocatedSlice}s that belong to this job.
-	 */
-	private final Map<JobID, List<AllocatedSlice>> slicesOfJobs;
-
-	/**
-	 * List of instance types that can be executed on this cluster, sorted by
-	 * price (cheapest to most expensive).
-	 */
-	private final InstanceType[] availableInstanceTypes;
-
-	/**
-	 * Map of instance type descriptions which can be queried by the job manager.
-	 */
-	private final Map<InstanceType, InstanceTypeDescription> instanceTypeDescriptionMap;
-
-	/**
-	 * Map of IP addresses to instance types.
-	 */
-	private final Map<InetAddress, InstanceType> ipToInstanceTypeMapping = new HashMap<InetAddress, InstanceType>();
-
-	/**
-	 * Map of pending requests of a job, i.e. the instance requests that could not be fulfilled during the initial
-	 * instance request.
-	 */
-	private final Map<JobID, PendingRequestsMap> pendingRequestsOfJob = new LinkedHashMap<JobID, PendingRequestsMap>();
-
-	/**
-	 * The network topology of the cluster.
-	 */
-	private final NetworkTopology networkTopology;
-
-	/**
-	 * Object that is notified if instances become available or vanish.
-	 */
-	private InstanceListener instanceListener;
-
-	/**
-	 * Matrix storing how many instances of a particular type and be accommodated in another instance type.
-	 */
-	private final int[][] instanceAccommodationMatrix;
-
-	private boolean shutdown;
-	
-	/**
-	 * Periodic task that checks whether hosts have not sent their heart-beat
-	 * messages and purges the hosts in this case.
-	 */
-	private final TimerTask cleanupStaleMachines = new TimerTask() {
-
-		@Override
-		public void run() {
-
-			synchronized (ClusterManager.this.lock) {
-
-				final List<Map.Entry<InstanceConnectionInfo, ClusterInstance>> hostsToRemove =
-					new ArrayList<Map.Entry<InstanceConnectionInfo, ClusterInstance>>();
-
-				final Map<JobID, List<AllocatedResource>> staleResources = new HashMap<JobID, List<AllocatedResource>>();
-
-				// check all hosts whether they did not send heat-beat messages.
-				for (Map.Entry<InstanceConnectionInfo, ClusterInstance> entry : registeredHosts.entrySet()) {
-
-					final ClusterInstance host = entry.getValue();
-					if (!host.isStillAlive(cleanUpInterval)) {
-
-						// this host has not sent the heat-beat messages
-						// -> we terminate all instances running on this host and notify the jobs
-						final List<AllocatedSlice> removedSlices = host.removeAllAllocatedSlices();
-						for (AllocatedSlice removedSlice : removedSlices) {
-
-							final JobID jobID = removedSlice.getJobID();
-							final List<AllocatedSlice> slicesOfJob = slicesOfJobs.get(jobID);
-							if (slicesOfJob == null) {
-								LOG.error("Cannot find allocated slices for job with ID + " + jobID);
-								continue;
-							}
-
-							slicesOfJob.remove(removedSlice);
-
-							// Clean up
-							if (slicesOfJob.isEmpty()) {
-								slicesOfJobs.remove(jobID);
-							}
-
-							List<AllocatedResource> staleResourcesOfJob = staleResources.get(removedSlice.getJobID());
-							if (staleResourcesOfJob == null) {
-								staleResourcesOfJob = new ArrayList<AllocatedResource>();
-								staleResources.put(removedSlice.getJobID(), staleResourcesOfJob);
-							}
-
-							staleResourcesOfJob.add(new AllocatedResource(removedSlice.getHostingInstance(),
-								removedSlice.getType(),
-								removedSlice.getAllocationID()));
-						}
-
-						hostsToRemove.add(entry);
-					}
-				}
-
-				registeredHosts.entrySet().removeAll(hostsToRemove);
-
-				updateInstaceTypeDescriptionMap();
-
-				final Iterator<Map.Entry<JobID, List<AllocatedResource>>> it = staleResources.entrySet().iterator();
-				while (it.hasNext()) {
-					final Map.Entry<JobID, List<AllocatedResource>> entry = it.next();
-					if (instanceListener != null) {
-						instanceListener.allocatedResourcesDied(entry.getKey(), entry.getValue());
-					}
-				}
-			}
-		}
-	};
-
-	// ------------------------------------------------------------------------
-	// Constructor and set-up
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Constructor.
-	 */
-	public ClusterManager() {
-
-		this.registeredHosts = new HashMap<InstanceConnectionInfo, ClusterInstance>();
-
-		this.slicesOfJobs = new HashMap<JobID, List<AllocatedSlice>>();
-
-		// Load the instance type this cluster can offer
-		this.defaultInstanceType = InstanceTypeFactory.constructFromDescription(ConfigConstants.DEFAULT_INSTANCE_TYPE);
-		
-		this.availableInstanceTypes = new InstanceType[] { this.defaultInstanceType };
-		
-		this.instanceAccommodationMatrix = calculateInstanceAccommodationMatrix();
-
-		this.instanceTypeDescriptionMap = new SerializableHashMap<InstanceType, InstanceTypeDescription>();
-
-		long tmpCleanUpInterval = (long) GlobalConfiguration.getInteger(CLEANUP_INTERVAL_KEY, DEFAULT_CLEANUP_INTERVAL) * 1000;
-
-		if (tmpCleanUpInterval < 10) { // Clean up interval must be at least ten seconds
-			LOG.warn("Invalid clean up interval. Reverting to default cleanup interval of " + DEFAULT_CLEANUP_INTERVAL
-				+ " secs.");
-			tmpCleanUpInterval = DEFAULT_CLEANUP_INTERVAL;
-		}
-
-		this.cleanUpInterval = tmpCleanUpInterval;
-
-		// sort available instances by CPU core
-		sortAvailableInstancesByNumberOfCPUCores();
-
-		this.networkTopology = NetworkTopology.createEmptyTopology();
-
-		// look every BASEINTERVAL milliseconds for crashed hosts
-		final boolean runTimerAsDaemon = true;
-		new Timer(runTimerAsDaemon).schedule(cleanupStaleMachines, 1000, 1000);
-
-		// Load available instance types into the instance description list
-		updateInstaceTypeDescriptionMap();
-	}
-
-	/**
-	 * Sorts the list of available instance types by the number of CPU cores in a descending order.
-	 */
-	private void sortAvailableInstancesByNumberOfCPUCores() {
-
-		if (this.availableInstanceTypes.length < 2) {
-			return;
-		}
-
-		for (int i = 1; i < this.availableInstanceTypes.length; i++) {
-			final InstanceType it = this.availableInstanceTypes[i];
-			int j = i;
-			while (j > 0 && this.availableInstanceTypes[j - 1].getNumberOfCores() < it.getNumberOfCores()) {
-				this.availableInstanceTypes[j] = this.availableInstanceTypes[j - 1];
-				--j;
-			}
-			this.availableInstanceTypes[j] = it;
-		}
-	}
-
-	@Override
-	public void shutdown() {
-		synchronized (this.lock) {
-			if (this.shutdown) {
-				return;
-			}
-			
-			this.cleanupStaleMachines.cancel();
-			
-			Iterator<ClusterInstance> it = this.registeredHosts.values().iterator();
-			while (it.hasNext()) {
-				it.next().destroyProxies();
-			}
-			this.registeredHosts.clear();
-			
-			this.shutdown = true;
-		}
-	}
-
-	@Override
-	public InstanceType getDefaultInstanceType() {
-		return this.defaultInstanceType;
-	}
-
-	@Override
-	public InstanceType getInstanceTypeByName(String instanceTypeName) {
-		synchronized (this.lock) {
-			for (InstanceType it : availableInstanceTypes) {
-				if (it.getIdentifier().equals(instanceTypeName)) {
-					return it;
-				}
-			}
-		}
-
-		return null;
-	}
-
-
-	@Override
-	public InstanceType getSuitableInstanceType(int minNumComputeUnits, int minNumCPUCores,
-			int minMemorySize, int minDiskCapacity, int maxPricePerHour)
-	{
-		// the instances are sorted by price -> the first instance that
-		// fulfills/ the requirements is suitable and the cheapest
-
-		synchronized (this.lock) {
-			for (InstanceType i : availableInstanceTypes) {
-				if (i.getNumberOfComputeUnits() >= minNumComputeUnits && i.getNumberOfCores() >= minNumCPUCores
-					&& i.getMemorySize() >= minMemorySize && i.getDiskCapacity() >= minDiskCapacity
-					&& i.getPricePerHour() <= maxPricePerHour) {
-					return i;
-				}
-			}
-		}
-		return null;
-	}
-
-
-	@Override
-	public void releaseAllocatedResource(JobID jobID, Configuration conf,
-			AllocatedResource allocatedResource) throws InstanceException
-	{
-		synchronized (this.lock) {
-			// release the instance from the host
-			final ClusterInstance clusterInstance = (ClusterInstance) allocatedResource.getInstance();
-			final AllocatedSlice removedSlice = clusterInstance.removeAllocatedSlice(allocatedResource.getAllocationID());
-
-			// remove the local association between instance and job
-			final List<AllocatedSlice> slicesOfJob = this.slicesOfJobs.get(jobID);
-			if (slicesOfJob == null) {
-				LOG.error("Cannot find allocated slice to release allocated slice for job " + jobID);
-				return;
-			}
-
-			slicesOfJob.remove(removedSlice);
-
-			// Clean up
-			if (slicesOfJob.isEmpty()) {
-				this.slicesOfJobs.remove(jobID);
-			}
-
-			// Check pending requests
-			checkPendingRequests();
-		}
-	}
-
-	/**
-	 * Creates a new {@link ClusterInstance} object to manage instances that can
-	 * be executed on that host.
-	 * 
-	 * @param instanceConnectionInfo
-	 *        the connection information for the instance
-	 * @param hardwareDescription
-	 *        the hardware description provided by the new instance
-	 * @return a new {@link ClusterInstance} object or <code>null</code> if the cluster instance could not be created
-	 */
-	private ClusterInstance createNewHost(final InstanceConnectionInfo instanceConnectionInfo,
-			final HardwareDescription hardwareDescription) {
-
-		// Check if there is a user-defined instance type for this IP address
-		InstanceType instanceType = this.ipToInstanceTypeMapping.get(instanceConnectionInfo.address());
-		if (instanceType != null) {
-			LOG.info("Found user-defined instance type for cluster instance with IP "
-				+ instanceConnectionInfo.address() + ": " + instanceType);
-		} else {
-			instanceType = matchHardwareDescriptionWithInstanceType(hardwareDescription);
-			if (instanceType != null) {
-				LOG.info("Hardware profile of cluster instance with IP " + instanceConnectionInfo.address()
-					+ " matches with instance type " + instanceType);
-			} else {
-				LOG.error("No matching instance type, cannot create cluster instance");
-				return null;
-			}
-		}
-
-		// Try to match new host with a stub host from the existing topology
-		String instanceName = instanceConnectionInfo.hostname();
-		NetworkNode parentNode = this.networkTopology.getRootNode();
-		NetworkNode currentStubNode = null;
-
-		// Try to match new host using the host name
-		while (true) {
-
-			currentStubNode = this.networkTopology.getNodeByName(instanceName);
-			if (currentStubNode != null) {
-				break;
-			}
-
-			final int pos = instanceName.lastIndexOf('.');
-			if (pos == -1) {
-				break;
-			}
-
-			/*
-			 * If host name is reported as FQDN, iterative remove parts
-			 * of the domain name until a match occurs or no more dots
-			 * can be found in the host name.
-			 */
-			instanceName = instanceName.substring(0, pos);
-		}
-
-		// Try to match the new host using the IP address
-		if (currentStubNode == null) {
-			instanceName = instanceConnectionInfo.address().toString();
-			instanceName = instanceName.replaceAll("/", ""); // Remove any / characters
-			currentStubNode = this.networkTopology.getNodeByName(instanceName);
-		}
-
-		if (currentStubNode != null) {
-			/*
-			 * The instance name will be the same as the one of the stub node. That way
-			 * the stub now will be removed from the network topology and replaced be
-			 * the new node.
-			 */
-			if (currentStubNode.getParentNode() != null) {
-				parentNode = currentStubNode.getParentNode();
-			}
-			// Remove the stub node from the tree
-			currentStubNode.remove();
-		}
-
-		LOG.info("Creating instance of type " + instanceType + " for " + instanceConnectionInfo + ", parent is "
-			+ parentNode.getName());
-		final ClusterInstance host = new ClusterInstance(instanceConnectionInfo, instanceType, parentNode,
-			this.networkTopology, hardwareDescription);
-
-		return host;
-	}
-
-	/**
-	 * Attempts to match the hardware characteristics provided by the {@link HardwareDescription} object with one
-	 * of the instance types set in the configuration. The matching is pessimistic, i.e. the hardware characteristics of
-	 * the chosen instance type never exceed the actually reported characteristics from the hardware description.
-	 * 
-	 * @param hardwareDescription
-	 *        the hardware description as reported by the instance
-	 * @return the best matching instance type or <code>null</code> if no matching instance type can be found
-	 */
-	private InstanceType matchHardwareDescriptionWithInstanceType(final HardwareDescription hardwareDescription) {
-
-		// Assumes that the available instance types are ordered by number of CPU cores in descending order
-		for (int i = 0; i < this.availableInstanceTypes.length; i++) {
-
-			final InstanceType candidateInstanceType = this.availableInstanceTypes[i];
-			// Check if number of CPU cores match
-			if (candidateInstanceType.getNumberOfCores() > hardwareDescription.getNumberOfCPUCores()) {
-				continue;
-			}
-
-			// Check if size of physical memory matches
-			final int memoryInMB = (int) (hardwareDescription.getSizeOfPhysicalMemory() / (1024L * 1024L));
-			if (candidateInstanceType.getMemorySize() > memoryInMB) {
-				continue;
-			}
-
-			return candidateInstanceType;
-		}
-
-		LOG.error("Cannot find matching instance type for hardware description ("
-			+ hardwareDescription.getNumberOfCPUCores() + " cores, " + hardwareDescription.getSizeOfPhysicalMemory()
-			+ " bytes of memory)");
-
-		return null;
-	}
-
-
-	@Override
-	public void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription) {
-
-		synchronized (this.lock) {
-			ClusterInstance host = registeredHosts.get(instanceConnectionInfo);
-	
-			// check whether we have discovered a new host
-			if (host == null) {
-				host = createNewHost(instanceConnectionInfo, hardwareDescription);
-	
-				if (host == null) {
-					LOG.error("Could not create a new host object for incoming heart-beat. "
-						+ "Probably the configuration file is lacking some entries.");
-					return;
-				}
-	
-				this.registeredHosts.put(instanceConnectionInfo, host);
-				LOG.info("New number of registered hosts is " + this.registeredHosts.size());
-	
-				// Update the list of instance type descriptions
-				updateInstaceTypeDescriptionMap();
-	
-				// Check if a pending request can be fulfilled by the new host
-				checkPendingRequests();
-			}
-			
-			host.reportHeartBeat();
-		}
-	}
-
-	/**
-	 * Checks if a pending request can be fulfilled.
-	 */
-	private void checkPendingRequests() {
-
-		final Iterator<Map.Entry<JobID, PendingRequestsMap>> it = this.pendingRequestsOfJob.entrySet().iterator();
-		while (it.hasNext()) {
-
-			final List<AllocatedResource> allocatedResources = new ArrayList<AllocatedResource>();
-			final Map.Entry<JobID, PendingRequestsMap> entry = it.next();
-			final JobID jobID = entry.getKey();
-			final PendingRequestsMap pendingRequestsMap = entry.getValue();
-			final Iterator<Map.Entry<InstanceType, Integer>> it2 = pendingRequestsMap.iterator();
-			while (it2.hasNext()) {
-
-				final Map.Entry<InstanceType, Integer> entry2 = it2.next();
-				final InstanceType requestedInstanceType = entry2.getKey();
-				int numberOfPendingInstances = entry2.getValue().intValue();
-
-				// Consistency check
-				if (numberOfPendingInstances <= 0) {
-					LOG.error("Inconsistency: Job " + jobID + " has " + numberOfPendingInstances
-						+ " requests for instance type " + requestedInstanceType.getIdentifier());
-					continue;
-				}
-
-				while (numberOfPendingInstances > 0) {
-
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Trying to allocate instance of type " + requestedInstanceType.getIdentifier());
-					}
-
-					// TODO: Introduce topology awareness here
-					final AllocatedSlice slice = getSliceOfType(jobID, requestedInstanceType);
-					if (slice == null) {
-						break;
-					} else {
-
-						LOG.info("Allocated instance of type " + requestedInstanceType.getIdentifier()
-							+ " as a result of pending request for job " + jobID);
-
-						// Decrease number of pending instances
-						--numberOfPendingInstances;
-						pendingRequestsMap.decreaseNumberOfPendingInstances(requestedInstanceType);
-
-						List<AllocatedSlice> allocatedSlices = this.slicesOfJobs.get(jobID);
-						if (allocatedSlices == null) {
-							allocatedSlices = new ArrayList<AllocatedSlice>();
-							this.slicesOfJobs.put(jobID, allocatedSlices);
-						}
-						allocatedSlices.add(slice);
-
-						allocatedResources.add(new AllocatedResource(slice.getHostingInstance(), slice.getType(), slice
-							.getAllocationID()));
-					}
-				}
-			}
-
-			if (!allocatedResources.isEmpty() && this.instanceListener != null) {
-
-				final ClusterInstanceNotifier clusterInstanceNotifier = new ClusterInstanceNotifier(
-					this.instanceListener, jobID, allocatedResources);
-
-				clusterInstanceNotifier.start();
-			}
-		}
-	}
-
-	/**
-	 * Attempts to allocate a slice of the given type for the given job. The method first attempts to allocate this
-	 * slice by finding a physical host which exactly matches the given instance type. If this attempt failed, it tries
-	 * to allocate the slice by partitioning the resources of a more powerful host.
-	 * 
-	 * @param jobID
-	 *        the ID of the job the slice shall be allocated for
-	 * @param instanceType
-	 *        the instance type of the requested slice
-	 * @return the allocated slice or <code>null</code> if no such slice could be allocated
-	 */
-	private AllocatedSlice getSliceOfType(final JobID jobID, final InstanceType instanceType) {
-
-		AllocatedSlice slice = null;
-
-		// Try to match the instance type without slicing first
-		for (final ClusterInstance host : this.registeredHosts.values()) {
-			if (host.getType().equals(instanceType)) {
-				slice = host.createSlice(instanceType, jobID);
-				if (slice != null) {
-					break;
-				}
-			}
-		}
-
-		// Use slicing now if necessary
-		if (slice == null) {
-
-			for (final ClusterInstance host : this.registeredHosts.values()) {
-				slice = host.createSlice(instanceType, jobID);
-				if (slice != null) {
-					break;
-				}
-			}
-		}
-
-		return slice;
-	}
-
-
-	@Override
-	public void requestInstance(JobID jobID, Configuration conf,  InstanceRequestMap instanceRequestMap, List<String> splitAffinityList)
-		throws InstanceException
-	{
-		final List<AllocatedSlice> newlyAllocatedSlicesOfJob = new ArrayList<AllocatedSlice>();
-		final Map<InstanceType, Integer> pendingRequests = new HashMap<InstanceType, Integer>();
-
-		synchronized(this.lock) {
-			// Iterate over all instance types
-			for (Iterator<Map.Entry<InstanceType, Integer>> it = instanceRequestMap.getMaximumIterator(); it.hasNext();) {
-	
-				// Iterate over all requested instances of a specific type
-				final Map.Entry<InstanceType, Integer> entry = it.next();
-				final int maximumNumberOfInstances = entry.getValue().intValue();
-	
-				for (int i = 0; i < maximumNumberOfInstances; i++) {
-	
-					LOG.info("Trying to allocate instance of type " + entry.getKey().getIdentifier());
-					
-					final AllocatedSlice slice = getSliceOfType(jobID, entry.getKey());
-	
-					if (slice == null) {
-						if (i < instanceRequestMap.getMinimumNumberOfInstances(entry.getKey())) {
-							// The request cannot be fulfilled, release the slices again and throw an exception
-							for (final AllocatedSlice sliceToRelease : newlyAllocatedSlicesOfJob) {
-								sliceToRelease.getHostingInstance().removeAllocatedSlice(sliceToRelease.getAllocationID());
-							}
-	
-							// TODO: Remove previously allocated slices again
-							throw new InstanceException("Could not find a suitable instance");
-						} else {
-	
-							// Remaining instances are pending
-							final int numberOfRemainingInstances = maximumNumberOfInstances - i;
-							if (numberOfRemainingInstances > 0) {
-	
-								// Store the request for the missing instances
-								Integer val = pendingRequests.get(entry.getKey());
-								if (val == null) {
-									val = Integer.valueOf(0);
-								}
-								val = Integer.valueOf(val.intValue() + numberOfRemainingInstances);
-								pendingRequests.put(entry.getKey(), val);
-							}
-	
-							break;
-						}
-					}
-	
-					newlyAllocatedSlicesOfJob.add(slice);
-				}
-			}
-	
-			// The request could be processed successfully, so update internal bookkeeping.
-			List<AllocatedSlice> allAllocatedSlicesOfJob = this.slicesOfJobs.get(jobID);
-			if (allAllocatedSlicesOfJob == null) {
-				allAllocatedSlicesOfJob = new ArrayList<AllocatedSlice>();
-				this.slicesOfJobs.put(jobID, allAllocatedSlicesOfJob);
-			}
-			allAllocatedSlicesOfJob.addAll(newlyAllocatedSlicesOfJob);
-	
-			PendingRequestsMap allPendingRequestsOfJob = this.pendingRequestsOfJob.get(jobID);
-			if (allPendingRequestsOfJob == null) {
-				allPendingRequestsOfJob = new PendingRequestsMap();
-				this.pendingRequestsOfJob.put(jobID, allPendingRequestsOfJob);
-			}
-			for (final Iterator<Map.Entry<InstanceType, Integer>> it = pendingRequests.entrySet().iterator(); it.hasNext();) {
-				final Map.Entry<InstanceType, Integer> entry = it.next();
-	
-				allPendingRequestsOfJob.addRequest(entry.getKey(), entry.getValue().intValue());
-			}
-	
-			// Finally, create the list of allocated resources for the scheduler
-			final List<AllocatedResource> allocatedResources = new ArrayList<AllocatedResource>();
-			for (final AllocatedSlice slice : newlyAllocatedSlicesOfJob) {
-				allocatedResources.add(new AllocatedResource(slice.getHostingInstance(), slice.getType(), slice
-					.getAllocationID()));
-			}
-	
-			if (this.instanceListener != null) {
-				final ClusterInstanceNotifier clusterInstanceNotifier = new ClusterInstanceNotifier(
-					this.instanceListener, jobID, allocatedResources);
-				clusterInstanceNotifier.start();
-			}
-		}
-	}
-
-
-	@Override
-	public NetworkTopology getNetworkTopology(JobID jobID) {
-		return this.networkTopology;
-	}
-
-
-	@Override
-	public void setInstanceListener(InstanceListener instanceListener) {
-		synchronized (this.lock) {
-			this.instanceListener = instanceListener;
-		}
-	}
-
-
-	@Override
-	public Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() {
-		Map<InstanceType, InstanceTypeDescription> copyToReturn = new SerializableHashMap<InstanceType, InstanceTypeDescription>();
-		synchronized (this.lock) {
-			copyToReturn.putAll(this.instanceTypeDescriptionMap);
-		}
-		return copyToReturn;
-	}
-
-	/**
-	 * Updates the list of instance type descriptions based on the currently registered hosts.
-	 */
-	private void updateInstaceTypeDescriptionMap() {
-
-		// this.registeredHosts.values().iterator()
-		this.instanceTypeDescriptionMap.clear();
-
-		final List<InstanceTypeDescription> instanceTypeDescriptionList = new ArrayList<InstanceTypeDescription>();
-
-		// initialize array which stores the availability counter for each instance type
-		final int[] numberOfInstances = new int[this.availableInstanceTypes.length];
-		for (int i = 0; i < numberOfInstances.length; i++) {
-			numberOfInstances[i] = 0;
-		}
-
-		// Shuffle through instance types
-		for (int i = 0; i < this.availableInstanceTypes.length; i++) {
-
-			final InstanceType currentInstanceType = this.availableInstanceTypes[i];
-			int numberOfMatchingInstances = 0;
-			int minNumberOfCPUCores = Integer.MAX_VALUE;
-			long minSizeOfPhysicalMemory = Long.MAX_VALUE;
-			long minSizeOfFreeMemory = Long.MAX_VALUE;
-			final Iterator<ClusterInstance> it = this.registeredHosts.values().iterator();
-			while (it.hasNext()) {
-				final ClusterInstance clusterInstance = it.next();
-				if (clusterInstance.getType().equals(currentInstanceType)) {
-					++numberOfMatchingInstances;
-					final HardwareDescription hardwareDescription = clusterInstance.getHardwareDescription();
-					minNumberOfCPUCores = Math.min(minNumberOfCPUCores, hardwareDescription.getNumberOfCPUCores());
-					minSizeOfPhysicalMemory = Math.min(minSizeOfPhysicalMemory,
-						hardwareDescription.getSizeOfPhysicalMemory());
-					minSizeOfFreeMemory = Math.min(minSizeOfFreeMemory, hardwareDescription.getSizeOfFreeMemory());
-				}
-			}
-
-			// Update number of instances
-			int highestAccommodationNumber = -1;
-			int highestAccommodationIndex = -1;
-			for (int j = 0; j < this.availableInstanceTypes.length; j++) {
-				final int accommodationNumber = canBeAccommodated(j, i);
-				// LOG.debug(this.availableInstanceTypes[j].getIdentifier() + " fits into "
-				// + this.availableInstanceTypes[i].getIdentifier() + " " + accommodationNumber + " times");
-				if (accommodationNumber > 0) {
-					numberOfInstances[j] += numberOfMatchingInstances * accommodationNumber;
-					if (accommodationNumber > highestAccommodationNumber) {
-						highestAccommodationNumber = accommodationNumber;
-						highestAccommodationIndex = j;
-					}
-				}
-			}
-
-			// Calculate hardware description
-			HardwareDescription pessimisticHardwareDescription = null;
-			if (minNumberOfCPUCores < Integer.MAX_VALUE && minSizeOfPhysicalMemory < Long.MAX_VALUE
-				&& minSizeOfFreeMemory < Long.MAX_VALUE) {
-
-				pessimisticHardwareDescription = HardwareDescriptionFactory.construct(minNumberOfCPUCores,
-					minSizeOfPhysicalMemory, minSizeOfFreeMemory);
-
-			} else {
-
-				if (highestAccommodationIndex < i) { // Since highestAccommodationIndex smaller than my index, the
-														// target instance must be more powerful
-
-					final InstanceTypeDescription descriptionOfLargerInstanceType = instanceTypeDescriptionList
-						.get(highestAccommodationIndex);
-					if (descriptionOfLargerInstanceType.getHardwareDescription() != null) {
-						final HardwareDescription hardwareDescriptionOfLargerInstanceType = descriptionOfLargerInstanceType
-							.getHardwareDescription();
-
-						final int numCores = hardwareDescriptionOfLargerInstanceType.getNumberOfCPUCores()
-							/ highestAccommodationNumber;
-						final long physMem = hardwareDescriptionOfLargerInstanceType.getSizeOfPhysicalMemory()
-							/ highestAccommodationNumber;
-						final long freeMem = hardwareDescriptionOfLargerInstanceType.getSizeOfFreeMemory()
-							/ highestAccommodationNumber;
-
-						pessimisticHardwareDescription = HardwareDescriptionFactory.construct(numCores, physMem,
-							freeMem);
-					}
-				}
-			}
-
-			instanceTypeDescriptionList.add(InstanceTypeDescriptionFactory.construct(currentInstanceType,
-				pessimisticHardwareDescription, numberOfInstances[i]));
-		}
-
-		final Iterator<InstanceTypeDescription> it = instanceTypeDescriptionList.iterator();
-		while (it.hasNext()) {
-
-			final InstanceTypeDescription itd = it.next();
-			this.instanceTypeDescriptionMap.put(itd.getInstanceType(), itd);
-		}
-	}
-
-	/**
-	 * Calculates the instance accommodation matrix which stores how many times a particular instance type can be
-	 * accommodated inside another instance type based on the list of available instance types.
-	 * 
-	 * @return the instance accommodation matrix
-	 */
-	private int[][] calculateInstanceAccommodationMatrix() {
-
-		if (this.availableInstanceTypes == null) {
-			LOG.error("Cannot compute instance accommodation matrix: availableInstanceTypes is null");
-			return null;
-		}
-
-		final int matrixSize = this.availableInstanceTypes.length;
-		final int[][] am = new int[matrixSize][matrixSize];
-
-		// Populate matrix
-		for (int i = 0; i < matrixSize; i++) {
-			for (int j = 0; j < matrixSize; j++) {
-
-				if (i == j) {
-					am[i][j] = 1;
-				} else {
-
-					final InstanceType sourceType = this.availableInstanceTypes[i];
-					InstanceType targetType = this.availableInstanceTypes[j];
-
-					// How many times can we accommodate source type into target type?
-					final int cores = targetType.getNumberOfCores() / sourceType.getNumberOfCores();
-					final int cu = targetType.getNumberOfComputeUnits() / sourceType.getNumberOfComputeUnits();
-					final int mem = targetType.getMemorySize() / sourceType.getMemorySize();
-					final int disk = targetType.getDiskCapacity() / sourceType.getDiskCapacity();
-
-					am[i][j] = Math.min(cores, Math.min(cu, Math.min(mem, disk)));
-				}
-			}
-		}
-
-		return am;
-	}
-
-	/**
-	 * Returns how many times the instance type stored at index <code>sourceTypeIndex</code> can be accommodated inside
-	 * the instance type stored at index <code>targetTypeIndex</code> in the list of available instance types.
-	 * 
-	 * @param sourceTypeIndex
-	 *        the index of the source instance type in the list of available instance types
-	 * @param targetTypeIndex
-	 *        the index of the target instance type in the list of available instance types
-	 * @return the number of times the source type instance can be accommodated inside the target instance
-	 */
-	private int canBeAccommodated(int sourceTypeIndex, int targetTypeIndex) {
-
-		if (sourceTypeIndex >= this.availableInstanceTypes.length
-			|| targetTypeIndex >= this.availableInstanceTypes.length) {
-			LOG.error("Cannot determine number of instance accomodations: invalid index");
-			return 0;
-		}
-
-		return this.instanceAccommodationMatrix[targetTypeIndex][sourceTypeIndex];
-	}
-
-
-	@Override
-	public AbstractInstance getInstanceByName(String name) {
-		if (name == null) {
-			throw new IllegalArgumentException("Argument name must not be null");
-		}
-
-		synchronized (this.lock) {
-			final Iterator<ClusterInstance> it = this.registeredHosts.values().iterator();
-			while (it.hasNext()) {
-				final AbstractInstance instance = it.next();
-				if (name.equals(instance.getName())) {
-					return instance;
-				}
-			}
-		}
-
-		return null;
-	}
-
-
-	@Override
-	public void cancelPendingRequests(JobID jobID) {
-		synchronized (this.lock) {
-			this.pendingRequestsOfJob.remove(jobID);
-		}
-	}
-
-	@Override
-	public int getNumberOfTaskTrackers() {
-		return this.registeredHosts.size();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/PendingRequestsMap.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/PendingRequestsMap.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/PendingRequestsMap.java
deleted file mode 100644
index ddc90e9..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/PendingRequestsMap.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.cluster;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import eu.stratosphere.nephele.instance.InstanceType;
-
-/**
- * This class represents a pending request, i.e. a request for a particular type and number of {@link AbstractInstance}
- * objects which could not be fulfilled yet.
- * <p>
- * This class is not thread-safe.
- * 
- */
-public final class PendingRequestsMap {
-
-	/**
-	 * The map storing the pending instance requests for the job this pending request object belongs to.
-	 */
-	private final Map<InstanceType, Integer> pendingRequests = new HashMap<InstanceType, Integer>();
-
-	/**
-	 * Checks if the job this object belongs to has pending instance requests.
-	 * 
-	 * @return <code>true</code> if the job this object belongs to has pending instance requests, <code>false</code>
-	 *         otherwise
-	 */
-	boolean hasPendingRequests() {
-
-		return !(this.pendingRequests.isEmpty());
-	}
-
-	/**
-	 * Adds the a pending request for the given number of instances of the given type to this map.
-	 * 
-	 * @param instanceType
-	 *        the requested instance type
-	 * @param numberOfInstances
-	 *        the requested number of instances of this type
-	 */
-	void addRequest(final InstanceType instanceType, final int numberOfInstances) {
-
-		Integer numberOfRemainingInstances = this.pendingRequests.get(instanceType);
-		if (numberOfRemainingInstances == null) {
-			numberOfRemainingInstances = Integer.valueOf(numberOfInstances);
-		} else {
-			numberOfRemainingInstances = Integer.valueOf(numberOfRemainingInstances.intValue() + numberOfInstances);
-		}
-
-		this.pendingRequests.put(instanceType, numberOfRemainingInstances);
-	}
-
-	/**
-	 * Returns an iterator for the pending requests encapsulated in this map.
-	 * 
-	 * @return an iterator for the pending requests encapsulated in this map
-	 */
-	Iterator<Map.Entry<InstanceType, Integer>> iterator() {
-
-		return this.pendingRequests.entrySet().iterator();
-	}
-
-	/**
-	 * Decreases the number of remaining instances to request of the given type.
-	 * 
-	 * @param instanceType
-	 *        the instance type for which the number of remaining instances shall be decreased
-	 */
-	void decreaseNumberOfPendingInstances(final InstanceType instanceType) {
-
-		Integer numberOfRemainingInstances = this.pendingRequests.get(instanceType);
-		if (numberOfRemainingInstances == null) {
-			return;
-		}
-
-		numberOfRemainingInstances = Integer.valueOf(numberOfRemainingInstances.intValue() - 1);
-		if (numberOfRemainingInstances.intValue() == 0) {
-			this.pendingRequests.remove(instanceType);
-		} else {
-			this.pendingRequests.put(instanceType, numberOfRemainingInstances);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstance.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstance.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstance.java
deleted file mode 100644
index 795889e..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstance.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.local;
-
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.HardwareDescription;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.topology.NetworkNode;
-import eu.stratosphere.nephele.topology.NetworkTopology;
-
-public class LocalInstance extends AbstractInstance {
-
-	public LocalInstance(InstanceType instanceType, InstanceConnectionInfo instanceConnectionInfo,
-			NetworkNode parentNode, NetworkTopology networkTopology, HardwareDescription hardwareDescription) {
-		super(instanceType, instanceConnectionInfo, parentNode, networkTopology, hardwareDescription);
-	}
-
-
-	@Override
-	public String toString() {
-
-		return this.getInstanceConnectionInfo().toString();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java
deleted file mode 100644
index e888b3f..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.local;
-
-import java.io.File;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Iterator;
-
-import eu.stratosphere.nephele.ExecutionMode;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.configuration.ConfigConstants;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.AllocatedResource;
-import eu.stratosphere.nephele.instance.AllocationID;
-import eu.stratosphere.nephele.instance.HardwareDescription;
-import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.instance.InstanceException;
-import eu.stratosphere.nephele.instance.InstanceListener;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.instance.InstanceRequestMap;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.instance.InstanceTypeDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceTypeFactory;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.taskmanager.TaskManager;
-import eu.stratosphere.nephele.topology.NetworkTopology;
-import eu.stratosphere.nephele.util.SerializableHashMap;
-
-/**
- * The local instance manager is designed to manage instance allocation/deallocation for a single-node setup. It spans a
- * task manager which is executed within the same process as the job manager. Moreover, it determines the hardware
- * characteristics of the machine it runs on and generates a default instance type with the identifier "default". If
- * desired this default instance type can also be overwritten.
- */
-public class LocalInstanceManager implements InstanceManager {
-
-	/**
-	 * The log object used to report events and errors.
-	 */
-	private static final Log LOG = LogFactory.getLog(LocalInstanceManager.class);
-
-	/**
-	 * The key for the configuration parameter defining the instance type to be used by the local instance manager. If
-	 * the parameter is not set, a default instance type with the identifier "default" is generated from the machine's
-	 * hardware characteristics.
-	 */
-
-	private static final String LOCALINSTANCE_TYPE_KEY = "instancemanager.local.type";
-
-	private static final int SLEEP_TIME = 50;
-
-	private static final int START_STOP_TIMEOUT = 2000;
-
-
-	/**
-	 * The instance listener registered with this instance manager.
-	 */
-	private InstanceListener instanceListener;
-
-	/**
-	 * The default instance type which is either generated from the hardware characteristics of the machine the local
-	 * instance manager runs on or read from the configuration.
-	 */
-	private final InstanceType defaultInstanceType;
-
-	/**
-	 * A synchronization object to protect critical sections.
-	 */
-	private final Object synchronizationObject = new Object();
-
-	/**
-	 * Stores which task manager is currently occupied by a job.
-	 */
-	private Map<LocalInstance, AllocatedResource> allocatedResources = new HashMap<LocalInstance, AllocatedResource>();
-
-	/**
-	 * The local instances encapsulating the task managers
-	 */
-	private Map<InstanceConnectionInfo, LocalInstance> localInstances = new HashMap<InstanceConnectionInfo,
-					LocalInstance>();
-
-	/**
-	 * The threads running the local task managers.
-	 */
-	private final List<TaskManager> taskManagers = new ArrayList<TaskManager>();
-
-	/**
-	 * The network topology the local instance is part of.
-	 */
-	private final NetworkTopology networkTopology;
-
-	/**
-	 * The map of instance type descriptions.
-	 */
-	private final Map<InstanceType, InstanceTypeDescription> instanceTypeDescriptionMap;
-
-	/**
-	 * Number of task managers
-	 */
-	private final int numTaskManagers;
-
-
-
-
-	/**
-	 * Constructs a new local instance manager.
-	 *
-	 */
-	public LocalInstanceManager() throws Exception {
-
-		final Configuration config = GlobalConfiguration.getConfiguration();
-
-		// get the default instance type
-		InstanceType type = null;
-		final String descr = config.getString(LOCALINSTANCE_TYPE_KEY, null);
-		if (descr != null) {
-			LOG.info("Attempting to parse default instance type from string " + descr);
-			type = InstanceTypeFactory.constructFromDescription(descr);
-			if (type == null) {
-				LOG.warn("Unable to parse default instance type from configuration, using hardware profile instead");
-			}
-		}
-
-		this.defaultInstanceType = (type != null) ? type : createDefaultInstanceType();
-
-		LOG.info("Default instance type is " + this.defaultInstanceType.getIdentifier());
-
-		this.networkTopology = NetworkTopology.createEmptyTopology();
-
-		this.instanceTypeDescriptionMap = new SerializableHashMap<InstanceType, InstanceTypeDescription>();
-
-		numTaskManagers = GlobalConfiguration.getInteger(ConfigConstants
-				.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
-
-		ExecutionMode executionMode = (numTaskManagers > 1) ? ExecutionMode.CLUSTER : ExecutionMode.LOCAL;
-
-		for(int i=0; i< numTaskManagers; i++){
-
-			Configuration tm = new Configuration();
-			int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
-					ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT);
-			int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-					ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
-
-			tm.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ipcPort + i);
-			tm.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + i);
-
-			GlobalConfiguration.includeConfiguration(tm);
-
-			TaskManager t = new TaskManager(executionMode);
-			taskManagers.add(t);
-		}
-	}
-
-
-	@Override
-	public InstanceType getDefaultInstanceType() {
-		return this.defaultInstanceType;
-	}
-
-
-	@Override
-	public InstanceType getInstanceTypeByName(String instanceTypeName) {
-		if (this.defaultInstanceType.getIdentifier().equals(instanceTypeName)) {
-			return this.defaultInstanceType;
-		}
-
-		return null;
-	}
-
-
-	@Override
-	public InstanceType getSuitableInstanceType(int minNumComputeUnits, int minNumCPUCores,
-			int minMemorySize, int minDiskCapacity, int maxPricePerHour) {
-
-		if (minNumComputeUnits > this.defaultInstanceType.getNumberOfComputeUnits()) {
-			return null;
-		}
-
-		if (minNumCPUCores > this.defaultInstanceType.getNumberOfCores()) {
-			return null;
-		}
-
-		if (minMemorySize > this.defaultInstanceType.getMemorySize()) {
-			return null;
-		}
-
-		if (minDiskCapacity > this.defaultInstanceType.getDiskCapacity()) {
-			return null;
-		}
-
-		if (maxPricePerHour > this.defaultInstanceType.getPricePerHour()) {
-			return null;
-		}
-
-		return this.defaultInstanceType;
-	}
-
-
-	@Override
-	public void releaseAllocatedResource(final JobID jobID, final Configuration conf,
-			final AllocatedResource allocatedResource)
-			throws InstanceException {
-		LocalInstance instance = (LocalInstance) allocatedResource.getInstance();
-
-		synchronized (this.synchronizationObject) {
-			if(allocatedResources.containsKey(allocatedResource.getInstance())){
-				if(allocatedResources.get(instance).equals(allocatedResource)){
-					allocatedResources.remove(instance);
-					return;
-				}
-			}
-			throw new InstanceException("Resource with allocation ID " + allocatedResource.getAllocationID()
-					+ " has not been allocated to job with ID " + jobID
-					+ " according to the local instance manager's internal bookkeeping");
-
-		}
-	}
-
-
-	@Override
-	public void reportHeartBeat(final InstanceConnectionInfo instanceConnectionInfo,
-			final HardwareDescription hardwareDescription) {
-
-		synchronized (this.synchronizationObject) {
-			if(!localInstances.containsKey(instanceConnectionInfo)){
-				LocalInstance localInstance = new LocalInstance(this.defaultInstanceType, instanceConnectionInfo,
-						this.networkTopology.getRootNode(), this.networkTopology, hardwareDescription);
-				localInstances.put(instanceConnectionInfo, localInstance);
-
-				this.instanceTypeDescriptionMap.put(this.defaultInstanceType, InstanceTypeDescriptionFactory
-						.construct(this.defaultInstanceType, hardwareDescription, localInstances.size()));
-			}
-		}
-	}
-
-
-	@Override
-	public void shutdown() {
-		// Stop the task managers
-		for(TaskManager t : taskManagers){
-			t.shutdown();
-		}
-
-		boolean areAllTaskManagerShutdown = false;
-		int timeout = START_STOP_TIMEOUT * this.taskManagers.size();
-
-		for(int sleep = 0; sleep < timeout; sleep += SLEEP_TIME){
-			areAllTaskManagerShutdown = true;
-
-			for(TaskManager t: taskManagers){
-				if(!t.isShutDown()){
-					areAllTaskManagerShutdown = false;
-					break;
-				}
-			}
-
-			if(areAllTaskManagerShutdown){
-				break;
-			}
-
-			try {
-				Thread.sleep(SLEEP_TIME);
-			}catch(InterruptedException e){
-				break;
-			}
-		}
-
-		if(!areAllTaskManagerShutdown){
-			throw new RuntimeException(String.format("TaskManager shut down timed out (%d ms).", timeout));
-		}
-
-		instanceTypeDescriptionMap.clear();
-
-		synchronized(this.synchronizationObject){
-			for(LocalInstance instance: this.localInstances.values()){
-				instance.destroyProxies();
-			}
-
-			localInstances.clear();
-		}
-	}
-
-
-	@Override
-	public NetworkTopology getNetworkTopology(final JobID jobID) {
-		return this.networkTopology;
-	}
-
-
-	@Override
-	public void setInstanceListener(final InstanceListener instanceListener) {
-		this.instanceListener = instanceListener;
-	}
-
-	/**
-	 * Creates a default instance type based on the hardware characteristics of the machine that calls this method. The
-	 * default instance type contains the machine's number of CPU cores and size of physical memory. The disc capacity
-	 * is calculated from the free space in the directory for temporary files.
-	 * 
-	 * @return the default instance type used for the local machine
-	 */
-	public static final InstanceType createDefaultInstanceType() {
-		final HardwareDescription hardwareDescription = HardwareDescriptionFactory.extractFromSystem();
-
-		int diskCapacityInGB = 0;
-		final String[] tempDirs = GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(File.pathSeparator);
-		
-		for (final String tempDir : tempDirs) {
-			if (tempDir != null) {
-				File f = new File(tempDir);
-				diskCapacityInGB = Math.max(diskCapacityInGB, (int) (f.getFreeSpace() / (1024L * 1024L * 1024L)));
-			}
-		}
-
-		final int physicalMemory = (int) (hardwareDescription.getSizeOfPhysicalMemory() / (1024L * 1024L));
-
-		return InstanceTypeFactory.construct("default", hardwareDescription.getNumberOfCPUCores(),
-			hardwareDescription.getNumberOfCPUCores(), physicalMemory, diskCapacityInGB, 0);
-	}
-
-
-	@Override
-	public Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() {
-		return this.instanceTypeDescriptionMap;
-	}
-
-	@Override
-	public void requestInstance(final JobID jobID, final Configuration conf,
-			final InstanceRequestMap instanceRequestMap,
-			final List<String> splitAffinityList) throws InstanceException {
-
-		// TODO: This can be implemented way simpler...
-		// Iterate over all instance types
-		final Iterator<Map.Entry<InstanceType, Integer>> it = instanceRequestMap.getMinimumIterator();
-		final List<AllocatedResource> assignedResources = new ArrayList<AllocatedResource>();
-		boolean assignmentSuccessful = true;
-
-		while (it.hasNext()) {
-
-			// Iterate over all requested instances of a specific type
-			final Map.Entry<InstanceType, Integer> entry = it.next();
-
-			for (int i = 0; i < entry.getValue().intValue(); i++) {
-
-				synchronized (this.synchronizationObject) {
-					boolean instanceFound = false;
-					for(LocalInstance instance: localInstances.values()){
-						if(!allocatedResources.containsKey(instance)){
-							AllocatedResource assignedResource = new AllocatedResource(instance, entry.getKey(),
-									new AllocationID());
-							allocatedResources.put(instance, assignedResource);
-							assignedResources.add(assignedResource);
-							instanceFound = true;
-							break;
-						}
-					}
-
-					assignmentSuccessful &= instanceFound;
-				}
-			}
-		}
-
-		if(assignmentSuccessful){
-			new LocalInstanceNotifier(this.instanceListener, jobID, assignedResources).start();
-		}else{
-			throw new InstanceException("Could not satisfy instance request.");
-		}
-	}
-
-	@Override
-	public AbstractInstance getInstanceByName(final String name) {
-		if (name == null) {
-			throw new IllegalArgumentException("Argument name must not be null");
-		}
-
-		synchronized (this.synchronizationObject) {
-			for(LocalInstance instance :localInstances.values()){
-				if(name.equals(instance.getName())){
-					return instance;
-				}
-			}
-		}
-		return null;
-	}
-
-
-	@Override
-	public void cancelPendingRequests(final JobID jobID) {
-		// The local instance manager does not support pending requests, so nothing to do here
-	}
-
-	@Override
-	public int getNumberOfTaskTrackers() {
-		return localInstances.size();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceNotifier.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceNotifier.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceNotifier.java
deleted file mode 100644
index 52da691..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceNotifier.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.local;
-
-import java.util.List;
-
-import eu.stratosphere.nephele.instance.AllocatedResource;
-import eu.stratosphere.nephele.instance.InstanceListener;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * This class is an auxiliary class to send the notification
- * about the availability of an {@link AllocatedResource} to the given {@link InstanceListener} object. The notification
- * must be sent from
- * a separate thread, otherwise the atomic operation of requesting an instance
- * for a vertex and switching to the state ASSINING could not be guaranteed.
- * This class is thread-safe.
- * 
- */
-public class LocalInstanceNotifier extends Thread {
-
-	/**
-	 * The {@link InstanceListener} object to send the notification to.
-	 */
-	private final InstanceListener instanceListener;
-
-	/**
-	 * The ID of the job the new instance belongs to.
-	 */
-	private final JobID jobID;
-
-	/**
-	 * The resources allocated for the job.
-	 */
-	private final List<AllocatedResource> allocatedResources;
-
-	/**
-	 * Constructs a new instance notifier object.
-	 * 
-	 * @param instanceListener
-	 *        the listener object to send the notification to
-	 * @param jobID
-	 *        the ID of the job the newly allocated resources belongs to
-	 * @param allocatedResource
-	 *        the resources allocated for the job
-	 */
-	public LocalInstanceNotifier(final InstanceListener instanceListener, final JobID jobID, final List<AllocatedResource> allocatedResources) {
-		this.instanceListener = instanceListener;
-		this.jobID = jobID;
-		this.allocatedResources = allocatedResources;
-	}
-
-
-	@Override
-	public void run() {
-		
-		this.instanceListener.resourcesAllocated(this.jobID, this.allocatedResources);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
index b4c51f2..d64c622 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
@@ -67,16 +67,6 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 	private int numberOfSubtasks = -1;
 
 	/**
-	 * The type of instance to be assigned to this task at runtime.
-	 */
-	private String instanceType = null;
-
-	/**
-	 * Number of subtasks to share the same instance at runtime.
-	 */
-	private int numberOfSubtasksPerInstance = -1;
-
-	/**
 	 * Number of retries in case of an error before the task represented by this vertex is considered as failed.
 	 */
 	private int numberOfExecutionRetries = -1;
@@ -150,8 +140,6 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 	 *        the vertex this vertex should connect to
 	 * @param channelType
 	 *        the channel type the two vertices should be connected by at runtime
-	 * @param compressionLevel
-	 *        the compression level the corresponding channel should have at runtime
 	 * @throws JobGraphDefinitionException
 	 *         thrown if the given vertex cannot be connected to <code>vertex</code> in the requested manner
 	 */
@@ -166,8 +154,8 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 	 *        the vertex this vertex should connect to
 	 * @param channelType
 	 *        the channel type the two vertices should be connected by at runtime
-	 * @param compressionLevel
-	 *        the compression level the corresponding channel should have at runtime
+	 * @param distributionPattern
+	 *        the distribution pattern between the two job vertices
 	 * @throws JobGraphDefinitionException
 	 *         thrown if the given vertex cannot be connected to <code>vertex</code> in the requested manner
 	 */
@@ -184,14 +172,14 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 	 *        the vertex this vertex should connect to
 	 * @param channelType
 	 *        the channel type the two vertices should be connected by at runtime
-	 * @param compressionLevel
-	 *        the compression level the corresponding channel should have at runtime
 	 * @param indexOfOutputGate
 	 *        index of the producing task's output gate to be used, <code>-1</code> will determine the next free index
 	 *        number
 	 * @param indexOfInputGate
 	 *        index of the consuming task's input gate to be used, <code>-1</code> will determine the next free index
 	 *        number
+	 * @param distributionPattern
+	 * 		  the distribution pattern between the two job vertices
 	 * @throws JobGraphDefinitionException
 	 *         thrown if the given vertex cannot be connected to <code>vertex</code> in the requested manner
 	 */
@@ -274,12 +262,12 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 	 *        the job vertex to connect to
 	 * @param channelType
 	 *        the channel type the two vertices should be connected by at runtime
-	 * @param compressionLevel
-	 *        the compression level the corresponding channel should have at runtime
 	 * @param indexOfOutputGate
 	 *        index of the producing task's output gate to be used
 	 * @param indexOfInputGate
 	 *        index of the consuming task's input gate to be used
+	 * @param distributionPattern
+	 * 		  the distribution pattern between the two job vertices
 	 */
 	private void connectBacklink(final AbstractJobVertex vertex, final ChannelType channelType,
 			final int indexOfOutputGate, final int indexOfInputGate,
@@ -364,32 +352,6 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 	}
 
 	/**
-	 * Returns the index of the edge which is used to connect the given job vertex to this job vertex.
-	 * 
-	 * @param jv
-	 *        the connected job vertex
-	 * @return the index of the edge which is used to connect the given job vertex to this job vertex or -1 if the given
-	 *         vertex is not connected to this job vertex
-	 */
-	/*
-	 * public int getBackwardConnectionIndex(AbstractJobVertex jv) {
-	 * if(jv == null) {
-	 * return -1;
-	 * }
-	 * final Iterator<JobEdge> it = this.backwardEdges.iterator();
-	 * int i = 0;
-	 * while(it.hasNext()) {
-	 * final JobEdge edge = it.next();
-	 * if(edge.getConnectedVertex() == jv) {
-	 * return i;
-	 * }
-	 * i++;
-	 * }
-	 * return -1;
-	 * }
-	 */
-
-	/**
 	 * Returns the ID of this job vertex.
 	 * 
 	 * @return the ID of this job vertex
@@ -407,15 +369,9 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 			throw new IOException("jobGraph is null, cannot deserialize");
 		}
 
-		// Read instance type
-		this.instanceType = StringRecord.readString(in);
-
 		// Read number of subtasks
 		this.numberOfSubtasks = in.readInt();
 
-		// Read number of subtasks per instance
-		this.numberOfSubtasksPerInstance = in.readInt();
-
 		// Number of execution retries
 		this.numberOfExecutionRetries = in.readInt();
 
@@ -489,15 +445,9 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 	@Override
 	public void write(final DataOutput out) throws IOException {
 
-		// Instance type
-		StringRecord.writeString(out, this.instanceType);
-
 		// Number of subtasks
 		out.writeInt(this.numberOfSubtasks);
 
-		// Number of subtasks per instance
-		out.writeInt(this.numberOfSubtasksPerInstance);
-
 		// Number of execution retries
 		out.writeInt(this.numberOfExecutionRetries);
 
@@ -595,44 +545,6 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 	}
 
 	/**
-	 * Sets the instance type the task this vertex represents should run on.
-	 * 
-	 * @param instanceType
-	 *        the instance type the task this vertex represents should run on
-	 */
-	public void setInstanceType(final String instanceType) {
-		this.instanceType = instanceType;
-	}
-
-	/**
-	 * Returns the instance type the task this vertex represents should run on.
-	 * 
-	 * @return the instance type the task this vertex represents should run on, <code>null</code> if unspecified
-	 */
-	public String getInstanceType() {
-		return this.instanceType;
-	}
-
-	/**
-	 * Sets the number of subtasks that should be assigned to the same instance.
-	 * 
-	 * @param numberOfSubtasksPerInstance
-	 *        the number of subtasks that should be assigned to the same instance
-	 */
-	public void setNumberOfSubtasksPerInstance(final int numberOfSubtasksPerInstance) {
-		this.numberOfSubtasksPerInstance = numberOfSubtasksPerInstance;
-	}
-
-	/**
-	 * Returns the number of subtasks that should be assigned to the same instance, <code>-1</code> if undefined.
-	 * 
-	 * @return the number of subtasks that should be assigned to the same instance, <code>-1</code> if undefined
-	 */
-	public int getNumberOfSubtasksPerInstance() {
-		return this.numberOfSubtasksPerInstance;
-	}
-
-	/**
 	 * Sets the vertex this vertex should share its instances with at runtime.
 	 * 
 	 * @param vertex

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/DeploymentManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/DeploymentManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/DeploymentManager.java
index bf017ce..b043ecd 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/DeploymentManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/DeploymentManager.java
@@ -16,19 +16,19 @@ package eu.stratosphere.nephele.jobmanager;
 import java.util.List;
 
 import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.instance.AbstractInstance;
+import eu.stratosphere.nephele.instance.Instance;
 import eu.stratosphere.nephele.jobgraph.JobID;
 
 /**
  * A deployment manager is responsible for deploying a list of {@link ExecutionVertex} objects the given
- * {@link AbstractInstance}. It is called by a {@link AbstractScheduler} implementation whenever at least one
+ * {@link eu.stratosphere.nephele.instance.Instance}. It is called by a {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler} implementation whenever at least one
  * {@link ExecutionVertex} has become ready to be executed.
  * 
  */
 public interface DeploymentManager {
 
 	/**
-	 * Deploys the list of vertices on the given {@link AbstractInstance}.
+	 * Deploys the list of vertices on the given {@link eu.stratosphere.nephele.instance.Instance}.
 	 * 
 	 * @param jobID
 	 *        the ID of the job the vertices to be deployed belong to
@@ -37,5 +37,5 @@ public interface DeploymentManager {
 	 * @param verticesToBeDeployed
 	 *        the list of vertices to be deployed
 	 */
-	void deploy(JobID jobID, AbstractInstance instance, List<ExecutionVertex> verticesToBeDeployed);
+	void deploy(JobID jobID, Instance instance, List<ExecutionVertex> verticesToBeDeployed);
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/EventCollector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/EventCollector.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/EventCollector.java
index a6f9cfe..37f9a43 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/EventCollector.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/EventCollector.java
@@ -38,7 +38,7 @@ import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
 import eu.stratosphere.nephele.executiongraph.JobStatusListener;
 import eu.stratosphere.nephele.executiongraph.ManagementGraphFactory;
 import eu.stratosphere.nephele.executiongraph.VertexAssignmentListener;
-import eu.stratosphere.nephele.instance.AbstractInstance;
+import eu.stratosphere.nephele.instance.Instance;
 import eu.stratosphere.nephele.instance.AllocatedResource;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.jobgraph.JobStatus;
@@ -266,10 +266,10 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 			final ManagementVertexID managementVertexID = id.toManagementVertexID();
 			final long timestamp = System.currentTimeMillis();
 
-			final AbstractInstance instance = newAllocatedResource.getInstance();
+			final Instance instance = newAllocatedResource.getInstance();
 			VertexAssignmentEvent event;
 			if (instance == null) {
-				event = new VertexAssignmentEvent(timestamp, managementVertexID, "null", "null");
+				event = new VertexAssignmentEvent(timestamp, managementVertexID, "null");
 			} else {
 
 				String instanceName = null;
@@ -279,8 +279,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 					instanceName = instance.toString();
 				}
 
-				event = new VertexAssignmentEvent(timestamp, managementVertexID, instanceName, instance.getType()
-					.getIdentifier());
+				event = new VertexAssignmentEvent(timestamp, managementVertexID, instanceName);
 			}
 
 			this.eventCollector.updateManagementGraph(jobID, event);
@@ -609,7 +608,6 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 			}
 
 			vertex.setInstanceName(vertexAssignmentEvent.getInstanceName());
-			vertex.setInstanceType(vertexAssignmentEvent.getInstanceType());
 		}
 	}