You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/22 23:47:27 UTC

[06/22] Rework the Taskmanager to a slot based model and remove legacy cloud code

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
deleted file mode 100644
index e369613..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-package eu.stratosphere.nephele.jobmanager.scheduler;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-
-import eu.stratosphere.nephele.taskmanager.AbstractTaskResult;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.executiongraph.ExecutionEdge;
-import eu.stratosphere.nephele.executiongraph.ExecutionGate;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.DummyInstance;
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
-import eu.stratosphere.nephele.util.SerializableHashSet;
-import eu.stratosphere.util.StringUtils;
-
-public final class RecoveryLogic {
-
-	/**
-	 * The logger to report information and problems.
-	 */
-	private static final Log LOG = LogFactory.getLog(RecoveryLogic.class);
-
-	/**
-	 * Private constructor so class cannot be instantiated.
-	 */
-	private RecoveryLogic() {
-	}
-
-	public static boolean recover(final ExecutionVertex failedVertex,
-			final Map<ExecutionVertexID, ExecutionVertex> verticesToBeRestarted,
-			final Set<ExecutionVertex> assignedVertices) {
-
-		// Perform initial sanity check
-		if (failedVertex.getExecutionState() != ExecutionState.FAILED) {
-			LOG.error("Vertex " + failedVertex + " is requested to be recovered, but is not failed");
-			return false;
-		}
-
-		final ExecutionGraph eg = failedVertex.getExecutionGraph();
-		synchronized (eg) {
-
-			LOG.info("Starting recovery for failed vertex " + failedVertex);
-
-			final Set<ExecutionVertex> verticesToBeCanceled = new HashSet<ExecutionVertex>();
-
-			findVerticesToRestart(failedVertex, verticesToBeCanceled);
-
-			// Restart all predecessors without checkpoint
-			final Iterator<ExecutionVertex> cancelIterator = verticesToBeCanceled.iterator();
-			while (cancelIterator.hasNext()) {
-
-				final ExecutionVertex vertex = cancelIterator.next();
-
-				if (vertex.compareAndUpdateExecutionState(ExecutionState.FINISHED, getStateToUpdate(vertex))) {
-					LOG.info("Vertex " + vertex + " has already finished and will not be canceled");
-					if (vertex.getExecutionState() == ExecutionState.ASSIGNED) {
-						assignedVertices.add(vertex);
-					}
-					continue;
-				}
-
-				LOG.info(vertex + " is canceled by recovery logic");
-				verticesToBeRestarted.put(vertex.getID(), vertex);
-				final TaskCancelResult cancelResult = vertex.cancelTask();
-
-				if (cancelResult.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS
-						&& cancelResult.getReturnCode() != AbstractTaskResult.ReturnCode.TASK_NOT_FOUND) {
-
-					verticesToBeRestarted.remove(vertex.getID());
-					LOG.error("Unable to cancel vertex" + cancelResult.getDescription());
-					return false;
-				}
-			}
-
-			LOG.info("Starting cache invalidation");
-
-			// Invalidate the lookup caches
-			if (!invalidateReceiverLookupCaches(failedVertex, verticesToBeCanceled)) {
-				return false;
-			}
-
-			LOG.info("Cache invalidation complete");
-
-			// Restart failed vertex
-			failedVertex.updateExecutionState(getStateToUpdate(failedVertex));
-			if (failedVertex.getExecutionState() == ExecutionState.ASSIGNED) {
-				assignedVertices.add(failedVertex);
-			}
-		}
-
-		return true;
-	}
-
-	static boolean hasInstanceAssigned(final ExecutionVertex vertex) {
-
-		return !(vertex.getAllocatedResource().getInstance() instanceof DummyInstance);
-	}
-
-	private static ExecutionState getStateToUpdate(final ExecutionVertex vertex) {
-
-		if (hasInstanceAssigned(vertex)) {
-			return ExecutionState.ASSIGNED;
-		}
-
-		return ExecutionState.CREATED;
-	}
-
-	private static void findVerticesToRestart(final ExecutionVertex failedVertex,
-			final Set<ExecutionVertex> verticesToBeCanceled) {
-
-		final Queue<ExecutionVertex> verticesToTest = new ArrayDeque<ExecutionVertex>();
-		final Set<ExecutionVertex> visited = new HashSet<ExecutionVertex>();
-		verticesToTest.add(failedVertex);
-
-		while (!verticesToTest.isEmpty()) {
-
-			final ExecutionVertex vertex = verticesToTest.poll();
-
-			// Predecessors must be either checkpoints or need to be restarted, too
-			for (int j = 0; j < vertex.getNumberOfPredecessors(); j++) {
-				final ExecutionVertex predecessor = vertex.getPredecessor(j);
-
-				if (hasInstanceAssigned(predecessor)) {
-					verticesToBeCanceled.add(predecessor);
-				}
-
-				if (!visited.contains(predecessor)) {
-					verticesToTest.add(predecessor);
-				}
-			}
-			visited.add(vertex);
-		}
-	}
-
-	private static final boolean invalidateReceiverLookupCaches(final ExecutionVertex failedVertex,
-			final Set<ExecutionVertex> verticesToBeCanceled) {
-
-		final Map<AbstractInstance, Set<ChannelID>> entriesToInvalidate = new HashMap<AbstractInstance, Set<ChannelID>>();
-
-		collectCacheEntriesToInvalidate(failedVertex, entriesToInvalidate);
-		for (final Iterator<ExecutionVertex> it = verticesToBeCanceled.iterator(); it.hasNext();) {
-			collectCacheEntriesToInvalidate(it.next(), entriesToInvalidate);
-		}
-
-		final Iterator<Map.Entry<AbstractInstance, Set<ChannelID>>> it = entriesToInvalidate.entrySet().iterator();
-
-		while (it.hasNext()) {
-
-			final Map.Entry<AbstractInstance, Set<ChannelID>> entry = it.next();
-			final AbstractInstance instance = entry.getKey();
-
-			try {
-				instance.invalidateLookupCacheEntries(entry.getValue());
-			} catch (IOException ioe) {
-				LOG.error(StringUtils.stringifyException(ioe));
-				return false;
-			}
-		}
-
-		return true;
-	}
-
-	private static void collectCacheEntriesToInvalidate(final ExecutionVertex vertex,
-			final Map<AbstractInstance, Set<ChannelID>> entriesToInvalidate) {
-
-		final int numberOfOutputGates = vertex.getNumberOfOutputGates();
-		for (int i = 0; i < numberOfOutputGates; ++i) {
-
-			final ExecutionGate outputGate = vertex.getOutputGate(i);
-			for (int j = 0; j < outputGate.getNumberOfEdges(); ++j) {
-
-				final ExecutionEdge outputChannel = outputGate.getEdge(j);
-
-				final ExecutionVertex connectedVertex = outputChannel.getInputGate().getVertex();
-				if (connectedVertex == null) {
-					LOG.error("Connected vertex is null");
-					continue;
-				}
-
-				final AbstractInstance instance = connectedVertex.getAllocatedResource().getInstance();
-				if (instance instanceof DummyInstance) {
-					continue;
-				}
-
-				Set<ChannelID> channelIDs = entriesToInvalidate.get(instance);
-				if (channelIDs == null) {
-					channelIDs = new SerializableHashSet<ChannelID>();
-					entriesToInvalidate.put(instance, channelIDs);
-				}
-
-				channelIDs.add(outputChannel.getInputChannelID());
-			}
-		}
-
-		for (int i = 0; i < vertex.getNumberOfInputGates(); ++i) {
-
-			final ExecutionGate inputGate = vertex.getInputGate(i);
-			for (int j = 0; j < inputGate.getNumberOfEdges(); ++j) {
-
-				final ExecutionEdge inputChannel = inputGate.getEdge(j);
-
-				final ExecutionVertex connectedVertex = inputChannel.getOutputGate().getVertex();
-				if (connectedVertex == null) {
-					LOG.error("Connected vertex is null");
-					continue;
-				}
-
-				final AbstractInstance instance = connectedVertex.getAllocatedResource().getInstance();
-				if (instance instanceof DummyInstance) {
-					continue;
-				}
-
-				Set<ChannelID> channelIDs = entriesToInvalidate.get(instance);
-				if (channelIDs == null) {
-					channelIDs = new SerializableHashSet<ChannelID>();
-					entriesToInvalidate.put(instance, channelIDs);
-				}
-
-				channelIDs.add(inputChannel.getOutputChannelID());
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalExecutionListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalExecutionListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalExecutionListener.java
deleted file mode 100644
index 9ae5635..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalExecutionListener.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobmanager.scheduler.local;
-
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.jobmanager.scheduler.AbstractExecutionListener;
-
-/**
- * This is a wrapper class for the {@link LocalScheduler} to receive
- * notifications about state changes of vertices belonging
- * to scheduled jobs.
- * <p>
- * This class is thread-safe.
- * 
- */
-public class LocalExecutionListener extends AbstractExecutionListener {
-
-	public LocalExecutionListener(final LocalScheduler scheduler, final ExecutionVertex executionVertex) {
-		super(scheduler, executionVertex);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalScheduler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalScheduler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalScheduler.java
deleted file mode 100644
index b731965..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalScheduler.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobmanager.scheduler.local;
-
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.Map;
-
-import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraphIterator;
-import eu.stratosphere.nephele.executiongraph.ExecutionStage;
-import eu.stratosphere.nephele.executiongraph.ExecutionStageListener;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
-import eu.stratosphere.nephele.executiongraph.JobStatusListener;
-import eu.stratosphere.nephele.instance.InstanceException;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.instance.InstanceRequestMap;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.jobmanager.DeploymentManager;
-import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
-import eu.stratosphere.nephele.jobmanager.scheduler.SchedulingException;
-import eu.stratosphere.util.StringUtils;
-
-public class LocalScheduler extends AbstractScheduler implements JobStatusListener, ExecutionStageListener {
-
-	/**
-	 * The job queue of the scheduler
-	 */
-	private Deque<ExecutionGraph> jobQueue = new ArrayDeque<ExecutionGraph>();
-
-	/**
-	 * Constructs a new local scheduler.
-	 * 
-	 * @param deploymentManager
-	 *        the deployment manager assigned to this scheduler
-	 * @param instanceManager
-	 *        the instance manager to be used with this scheduler
-	 */
-	public LocalScheduler(final DeploymentManager deploymentManager, final InstanceManager instanceManager) {
-		super(deploymentManager, instanceManager);
-	}
-
-	void removeJobFromSchedule(final ExecutionGraph executionGraphToRemove) {
-
-		boolean removedFromQueue = false;
-
-		synchronized (this.jobQueue) {
-
-			final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
-			while (it.hasNext()) {
-
-				final ExecutionGraph executionGraph = it.next();
-				// Field jobID of executionGraph is immutable, so no synchronization needed
-				if (executionGraph.getJobID().equals(executionGraphToRemove.getJobID())) {
-					removedFromQueue = true;
-					it.remove();
-					break;
-				}
-
-			}
-		}
-
-		if (!removedFromQueue) {
-			LOG.error("Cannot find job " + executionGraphToRemove.getJobName() + " ("
-				+ executionGraphToRemove.getJobID() + ") to remove");
-		}
-
-		// TODO: Remove vertices from restart map
-	}
-
-
-	@Override
-	public void schedulJob(final ExecutionGraph executionGraph) throws SchedulingException {
-
-		// Get Map of all available Instance types
-		final Map<InstanceType, InstanceTypeDescription> availableInstances = getInstanceManager()
-				.getMapOfAvailableInstanceTypes();
-
-		final Iterator<ExecutionStage> stageIt = executionGraph.iterator();
-		while (stageIt.hasNext()) {
-
-			final InstanceRequestMap instanceRequestMap = new InstanceRequestMap();
-			final ExecutionStage stage = stageIt.next();
-			stage.collectRequiredInstanceTypes(instanceRequestMap, ExecutionState.CREATED);
-
-			// Iterator over required Instances
-			final Iterator<Map.Entry<InstanceType, Integer>> it = instanceRequestMap.getMinimumIterator();
-			while (it.hasNext()) {
-
-				final Map.Entry<InstanceType, Integer> entry = it.next();
-
-				final InstanceTypeDescription descr = availableInstances.get(entry.getKey());
-				if (descr == null) {
-					throw new SchedulingException("Unable to schedule job: No instance of type " + entry.getKey()
-							+ " available");
-				}
-
-				if (descr.getMaximumNumberOfAvailableInstances() != -1
-						&& descr.getMaximumNumberOfAvailableInstances() < entry.getValue().intValue()) {
-					throw new SchedulingException("Unable to schedule job: " + entry.getValue().intValue()
-							+ " instances of type " + entry.getKey() + " required, but only "
-							+ descr.getMaximumNumberOfAvailableInstances() + " are available");
-				}
-			}
-		}
-
-		// Subscribe to job status notifications
-		executionGraph.registerJobStatusListener(this);
-
-		// Set state of each vertex for scheduled
-		final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, true);
-		while (it2.hasNext()) {
-
-			final ExecutionVertex vertex = it2.next();
-			vertex.registerExecutionListener(new LocalExecutionListener(this, vertex));
-		}
-
-		// Register the scheduler as an execution stage listener
-		executionGraph.registerExecutionStageListener(this);
-
-		// Add job to the job queue (important to add job to queue before requesting instances)
-		synchronized (this.jobQueue) {
-			this.jobQueue.add(executionGraph);
-		}
-
-		// Request resources for the first stage of the job
-
-		final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
-		try {
-			requestInstances(executionStage);
-		} catch (InstanceException e) {
-			final String exceptionMessage = StringUtils.stringifyException(e);
-			LOG.error(exceptionMessage);
-			this.jobQueue.remove(executionGraph);
-			throw new SchedulingException(exceptionMessage);
-		}
-	}
-
-
-	@Override
-	public ExecutionGraph getExecutionGraphByID(final JobID jobID) {
-
-		synchronized (this.jobQueue) {
-
-			final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
-			while (it.hasNext()) {
-
-				final ExecutionGraph executionGraph = it.next();
-				if (executionGraph.getJobID().equals(jobID)) {
-					return executionGraph;
-				}
-			}
-		}
-
-		return null;
-	}
-
-
-	@Override
-	public void shutdown() {
-
-		synchronized (this.jobQueue) {
-			this.jobQueue.clear();
-		}
-
-	}
-
-
-	@Override
-	public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
-			final String optionalMessage) {
-
-		if (newJobStatus == InternalJobStatus.FAILED || newJobStatus == InternalJobStatus.FINISHED
-			|| newJobStatus == InternalJobStatus.CANCELED) {
-			removeJobFromSchedule(executionGraph);
-		}
-	}
-
-
-	@Override
-	public void nextExecutionStageEntered(final JobID jobID, final ExecutionStage executionStage) {
-
-		// Request new instances if necessary
-		try {
-			requestInstances(executionStage);
-		} catch (InstanceException e) {
-			// TODO: Handle this error correctly
-			LOG.error(StringUtils.stringifyException(e));
-		}
-
-		// Deploy the assigned vertices
-		deployAssignedInputVertices(executionStage.getExecutionGraph());
-
-		// Initialize the replay of the previous stage's checkpoints
-		replayCheckpointsFromPreviousStage(executionStage.getExecutionGraph());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueExecutionListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueExecutionListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueExecutionListener.java
deleted file mode 100644
index 1d37edc..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueExecutionListener.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobmanager.scheduler.queue;
-
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.jobmanager.scheduler.AbstractExecutionListener;
-
-/**
- * This is a wrapper class for the {@link QueueScheduler} to receive
- * notifications about state changes of vertices belonging
- * to scheduled jobs.
- * <p>
- * This class is thread-safe.
- * 
- */
-public final class QueueExecutionListener extends AbstractExecutionListener {
-
-	/**
-	 * Constructs a new queue execution listener.
-	 * 
-	 * @param scheduler
-	 *        the scheduler this listener is connected with
-	 * @param executionVertex
-	 *        the execution vertex this listener is created for
-	 */
-	public QueueExecutionListener(final QueueScheduler scheduler, final ExecutionVertex executionVertex) {
-		super(scheduler, executionVertex);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueScheduler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueScheduler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueScheduler.java
deleted file mode 100644
index cd76f04..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueScheduler.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobmanager.scheduler.queue;
-
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.Map;
-
-import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraphIterator;
-import eu.stratosphere.nephele.executiongraph.ExecutionStage;
-import eu.stratosphere.nephele.executiongraph.ExecutionStageListener;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
-import eu.stratosphere.nephele.executiongraph.JobStatusListener;
-import eu.stratosphere.nephele.instance.InstanceException;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.instance.InstanceRequestMap;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.jobmanager.DeploymentManager;
-import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
-import eu.stratosphere.nephele.jobmanager.scheduler.SchedulingException;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * The queue scheduler mains of queue of all submitted jobs and executes one job at a time.
- * 
- */
-public class QueueScheduler extends AbstractScheduler implements JobStatusListener, ExecutionStageListener {
-
-	/**
-	 * The job queue where all submitted jobs go to.
-	 */
-	private Deque<ExecutionGraph> jobQueue = new ArrayDeque<ExecutionGraph>();
-
-	/**
-	 * Constructs a new queue scheduler.
-	 * 
-	 * @param deploymentManager
-	 *        the deployment manager assigned to this scheduler
-	 * @param instanceManager
-	 *        the instance manager to be used with this scheduler
-	 */
-	public QueueScheduler(final DeploymentManager deploymentManager, final InstanceManager instanceManager) {
-		super(deploymentManager, instanceManager);
-	}
-
-	/**
-	 * Removes the job represented by the given {@link ExecutionGraph} from the scheduler.
-	 * 
-	 * @param executionGraphToRemove
-	 *        the job to be removed
-	 */
-	void removeJobFromSchedule(final ExecutionGraph executionGraphToRemove) {
-
-		boolean removedFromQueue = false;
-
-		synchronized (this.jobQueue) {
-
-			final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
-			while (it.hasNext()) {
-
-				final ExecutionGraph executionGraph = it.next();
-				if (executionGraph.getJobID().equals(executionGraphToRemove.getJobID())) {
-					removedFromQueue = true;
-					it.remove();
-					break;
-				}
-			}
-		}
-
-		if (!removedFromQueue) {
-			LOG.error("Cannot find job " + executionGraphToRemove.getJobName() + " ("
-				+ executionGraphToRemove.getJobID() + ") to remove");
-		}
-	}
-
-
-	@Override
-	public void schedulJob(final ExecutionGraph executionGraph) throws SchedulingException {
-
-		// Get Map of all available Instance types
-		final Map<InstanceType, InstanceTypeDescription> availableInstances = getInstanceManager()
-				.getMapOfAvailableInstanceTypes();
-
-		final Iterator<ExecutionStage> stageIt = executionGraph.iterator();
-		while (stageIt.hasNext()) {
-
-			final InstanceRequestMap instanceRequestMap = new InstanceRequestMap();
-			final ExecutionStage stage = stageIt.next();
-			stage.collectRequiredInstanceTypes(instanceRequestMap, ExecutionState.CREATED);
-
-			// Iterator over required Instances
-			final Iterator<Map.Entry<InstanceType, Integer>> it = instanceRequestMap.getMinimumIterator();
-			while (it.hasNext()) {
-
-				final Map.Entry<InstanceType, Integer> entry = it.next();
-
-				final InstanceTypeDescription descr = availableInstances.get(entry.getKey());
-				if (descr == null) {
-					throw new SchedulingException("Unable to schedule job: No instance of type " + entry.getKey()
-							+ " available");
-				}
-
-				if (descr.getMaximumNumberOfAvailableInstances() != -1
-						&& descr.getMaximumNumberOfAvailableInstances() < entry.getValue().intValue()) {
-					throw new SchedulingException("Unable to schedule job: " + entry.getValue().intValue()
-							+ " instances of type " + entry.getKey() + " required, but only "
-							+ descr.getMaximumNumberOfAvailableInstances() + " are available");
-				}
-			}
-		}
-
-		// Subscribe to job status notifications
-		executionGraph.registerJobStatusListener(this);
-
-		// Register execution listener for each vertex
-		final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, true);
-		while (it2.hasNext()) {
-
-			final ExecutionVertex vertex = it2.next();
-			vertex.registerExecutionListener(new QueueExecutionListener(this, vertex));
-		}
-
-		// Register the scheduler as an execution stage listener
-		executionGraph.registerExecutionStageListener(this);
-
-		// Add job to the job queue (important to add job to queue before requesting instances)
-		synchronized (this.jobQueue) {
-			this.jobQueue.add(executionGraph);
-		}
-
-		// Request resources for the first stage of the job
-
-		final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
-		try {
-			requestInstances(executionStage);
-		} catch (InstanceException e) {
-			final String exceptionMessage = StringUtils.stringifyException(e);
-			LOG.error(exceptionMessage);
-			this.jobQueue.remove(executionGraph);
-			throw new SchedulingException(exceptionMessage);
-		}
-	}
-
-
-	@Override
-	public ExecutionGraph getExecutionGraphByID(final JobID jobID) {
-
-		synchronized (this.jobQueue) {
-
-			final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
-			while (it.hasNext()) {
-
-				final ExecutionGraph executionGraph = it.next();
-				if (executionGraph.getJobID().equals(jobID)) {
-					return executionGraph;
-				}
-			}
-		}
-
-		return null;
-	}
-
-
-	@Override
-	public void shutdown() {
-
-		synchronized (this.jobQueue) {
-			this.jobQueue.clear();
-		}
-
-	}
-
-
-	@Override
-	public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
-			final String optionalMessage) {
-
-		if (newJobStatus == InternalJobStatus.FAILED || newJobStatus == InternalJobStatus.FINISHED
-			|| newJobStatus == InternalJobStatus.CANCELED) {
-			removeJobFromSchedule(executionGraph);
-		}
-	}
-
-
-	@Override
-	public void nextExecutionStageEntered(final JobID jobID, final ExecutionStage executionStage) {
-
-		// Request new instances if necessary
-		try {
-			requestInstances(executionStage);
-		} catch (InstanceException e) {
-			// TODO: Handle error correctly
-			LOG.error(StringUtils.stringifyException(e));
-		}
-
-		// Deploy the assigned vertices
-		deployAssignedInputVertices(executionStage.getExecutionGraph());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
index eea78d8..bbef991 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
@@ -37,7 +37,7 @@ import eu.stratosphere.util.StringUtils;
 
 /**
  * The input split manager is responsible for serving input splits to {@link AbstractInputTask} objects at runtime.
- * Before passed on to the {@link AbstractScheduler}, an {@link ExecutionGraph} is registered with the input split
+ * Before passed on to the {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler}, an {@link ExecutionGraph} is registered with the input split
  * manager and all included input vertices of the graph register their generated input splits with the manager. Each
  * type of input split can be assigned to a specific {@link InputSplitAssigner} which is loaded by the input split
  * manager at runtime.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
index 85df81a..3717fbf 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
@@ -16,6 +16,7 @@ package eu.stratosphere.nephele.jobmanager.splitassigner;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import eu.stratosphere.nephele.instance.Instance;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -23,7 +24,6 @@ import eu.stratosphere.core.io.InputSplit;
 import eu.stratosphere.core.io.LocatableInputSplit;
 import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.instance.AbstractInstance;
 import eu.stratosphere.nephele.template.AbstractInputTask;
 import eu.stratosphere.nephele.template.AbstractInvokable;
 
@@ -115,7 +115,7 @@ public final class LocatableInputSplitAssigner implements InputSplitAssigner {
 			return null;
 		}
 
-		final AbstractInstance instance = vertex.getAllocatedResource().getInstance();
+		final Instance instance = vertex.getAllocatedResource().getInstance();
 		if (instance == null) {
 			LOG.error("Instance is null, returning random split");
 			return null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitList.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitList.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitList.java
index c830a6f..7647fae 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitList.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitList.java
@@ -21,16 +21,16 @@ import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
 
+import eu.stratosphere.nephele.instance.Instance;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import eu.stratosphere.core.io.LocatableInputSplit;
-import eu.stratosphere.nephele.instance.AbstractInstance;
 
 /**
  * The locatable input split list stores the locatable input splits for an input vertex that are still expected to be
  * consumed. Besides simply storing the splits, the locatable input split list also computes the distance all
- * {@link AbstractInstance} objects which request an input split and its nearest storage location with respect to the
+ * {@link eu.stratosphere.nephele.instance.Instance} objects which request an input split and its nearest storage location with respect to the
  * underlying network topology. That way input splits are always given to consuming vertices in a way that data locality
  * is preserved as well as possible.
  * <p>
@@ -50,13 +50,13 @@ public final class LocatableInputSplitList {
 	private Set<LocatableInputSplit> masterSet = new HashSet<LocatableInputSplit>();
 
 	/**
-	 * The map caching the specific file input split lists for each {@link AbstractInstance}.
+	 * The map caching the specific file input split lists for each {@link eu.stratosphere.nephele.instance.Instance}.
 	 */
-	private Map<AbstractInstance, Queue<QueueElem>> instanceMap = new HashMap<AbstractInstance, Queue<QueueElem>>();
+	private Map<Instance, Queue<QueueElem>> instanceMap = new HashMap<Instance, Queue<QueueElem>>();
 
 	/**
 	 * This is an auxiliary class to store the minimum distance between a file input split's storage locations and an
-	 * {@link AbstractInstance}.
+	 * {@link eu.stratosphere.nephele.instance.Instance}.
 	 * 
 	 */
 	private final class QueueElem implements Comparable<QueueElem> {
@@ -120,7 +120,7 @@ public final class LocatableInputSplitList {
 	/**
 	 * Returns the next locatable input split to be consumed by the given instance. The returned input split is selected
 	 * in a
-	 * way that the distance between the split's storage location and the requesting {@link AbstractInstance} is as
+	 * way that the distance between the split's storage location and the requesting {@link eu.stratosphere.nephele.instance.Instance} is as
 	 * short as possible.
 	 * 
 	 * @param instance
@@ -128,7 +128,7 @@ public final class LocatableInputSplitList {
 	 * @return the next input split to be consumed by the given instance or <code>null</code> if all input splits have
 	 *         already been consumed.
 	 */
-	synchronized LocatableInputSplit getNextInputSplit(final AbstractInstance instance) {
+	synchronized LocatableInputSplit getNextInputSplit(final Instance instance) {
 
 		final Queue<QueueElem> instanceSplitList = getInstanceSplitList(instance);
 
@@ -157,16 +157,16 @@ public final class LocatableInputSplitList {
 	}
 
 	/**
-	 * Returns a list of locatable input splits specifically ordered for the given {@link AbstractInstance}. When the
+	 * Returns a list of locatable input splits specifically ordered for the given {@link eu.stratosphere.nephele.instance.Instance}. When the
 	 * list is initially created, it contains all the unconsumed located input splits at that point in time, ascendingly
 	 * ordered
-	 * by the minimum distance between the input splits' storage locations and the given {@link AbstractInstance}.
+	 * by the minimum distance between the input splits' storage locations and the given {@link eu.stratosphere.nephele.instance.Instance}.
 	 * 
 	 * @param instance
 	 *        the instance for which the locatable input split list has been computed
 	 * @return the list of file input splits ordered specifically for the given instance
 	 */
-	private Queue<QueueElem> getInstanceSplitList(final AbstractInstance instance) {
+	private Queue<QueueElem> getInstanceSplitList(final Instance instance) {
 
 		Queue<QueueElem> instanceSplitList = this.instanceMap.get(instance);
 		if (instanceSplitList == null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
index 938fb48..7894334 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
@@ -16,6 +16,7 @@ package eu.stratosphere.nephele.jobmanager.splitassigner.file;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import eu.stratosphere.nephele.instance.Instance;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -23,7 +24,6 @@ import eu.stratosphere.core.fs.FileInputSplit;
 import eu.stratosphere.core.io.InputSplit;
 import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.instance.AbstractInstance;
 import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitAssigner;
 import eu.stratosphere.nephele.template.AbstractInputTask;
 import eu.stratosphere.nephele.template.AbstractInvokable;
@@ -117,7 +117,7 @@ public final class FileInputSplitAssigner implements InputSplitAssigner {
 			return null;
 		}
 
-		final AbstractInstance instance = vertex.getAllocatedResource().getInstance();
+		final Instance instance = vertex.getAllocatedResource().getInstance();
 		if (instance == null) {
 			LOG.error("Instance is null, returning random split");
 			return null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitList.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitList.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitList.java
index db84a91..ae9898a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitList.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitList.java
@@ -21,15 +21,15 @@ import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
 
+import eu.stratosphere.nephele.instance.Instance;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import eu.stratosphere.core.fs.FileInputSplit;
-import eu.stratosphere.nephele.instance.AbstractInstance;
 
 /**
  * The file input split list stores the file input splits for an input vertex that are still expected to be consumed.
- * Besides simply storing the splits, the file input split list also computes the distance all {@link AbstractInstance}
+ * Besides simply storing the splits, the file input split list also computes the distance all {@link eu.stratosphere.nephele.instance.Instance}
  * objects which request a input split and its nearest storage location with respect to the underlying network topology.
  * That way input splits are always given to consuming vertices in a way that data locality is preserved as well as
  * possible.
@@ -50,13 +50,13 @@ public final class FileInputSplitList {
 	private Set<FileInputSplit> masterSet = new HashSet<FileInputSplit>();
 
 	/**
-	 * The map caching the specific file input split lists for each {@link AbstractInstance}.
+	 * The map caching the specific file input split lists for each {@link eu.stratosphere.nephele.instance.Instance}.
 	 */
-	private Map<AbstractInstance, Queue<QueueElem>> instanceMap = new HashMap<AbstractInstance, Queue<QueueElem>>();
+	private Map<Instance, Queue<QueueElem>> instanceMap = new HashMap<Instance, Queue<QueueElem>>();
 
 	/**
 	 * This is an auxiliary class to store the minimum distance between a file input split's storage locations and an
-	 * {@link AbstractInstance}.
+	 * {@link eu.stratosphere.nephele.instance.Instance}.
 	 * 
 	 */
 	private final class QueueElem implements Comparable<QueueElem> {
@@ -119,7 +119,7 @@ public final class FileInputSplitList {
 
 	/**
 	 * Returns the next file input split to be consumed by the given instance. The returned input split is selected in a
-	 * way that the distance between the split's storage location and the requesting {@link AbstractInstance} is as
+	 * way that the distance between the split's storage location and the requesting {@link eu.stratosphere.nephele.instance.Instance} is as
 	 * short as possible.
 	 * 
 	 * @param instance
@@ -127,7 +127,7 @@ public final class FileInputSplitList {
 	 * @return the next input split to be consumed by the given instance or <code>null</code> if all input splits have
 	 *         already been consumed.
 	 */
-	synchronized FileInputSplit getNextInputSplit(final AbstractInstance instance) {
+	synchronized FileInputSplit getNextInputSplit(final Instance instance) {
 
 		final Queue<QueueElem> instanceSplitList = getInstanceSplitList(instance);
 
@@ -156,15 +156,15 @@ public final class FileInputSplitList {
 	}
 
 	/**
-	 * Returns a list of file input splits specifically ordered for the given {@link AbstractInstance}. When the list is
+	 * Returns a list of file input splits specifically ordered for the given {@link eu.stratosphere.nephele.instance.Instance}. When the list is
 	 * initially created, it contains all the unconsumed file input splits at that point in time, ascendingly ordered by
-	 * the minimum distance between the input splits' storage locations and the given {@link AbstractInstance}.
+	 * the minimum distance between the input splits' storage locations and the given {@link eu.stratosphere.nephele.instance.Instance}.
 	 * 
 	 * @param instance
 	 *        the instance for which the file input split list has been computed
 	 * @return the list of file input splits ordered specifically for the given instance
 	 */
-	private Queue<QueueElem> getInstanceSplitList(final AbstractInstance instance) {
+	private Queue<QueueElem> getInstanceSplitList(final Instance instance) {
 
 		Queue<QueueElem> instanceSplitList = this.instanceMap.get(instance);
 		if (instanceSplitList == null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
index 374656b..fab720d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
@@ -445,9 +445,8 @@ public final class ManagementGraph extends ManagementAttachment implements IORea
 			groupVertexID.read(in);
 			final ManagementGroupVertex groupVertex = this.getGroupVertexByID(groupVertexID);
 			final String instanceName = StringRecord.readString(in);
-			final String instanceType = StringRecord.readString(in);
 			final int indexInGroup = in.readInt();
-			final ManagementVertex vertex = new ManagementVertex(groupVertex, vertexID, instanceName, instanceType, indexInGroup);
+			final ManagementVertex vertex = new ManagementVertex(groupVertex, vertexID, instanceName, indexInGroup);
 			vertex.read(in);
 		}
 
@@ -523,7 +522,6 @@ public final class ManagementGraph extends ManagementAttachment implements IORea
 			managementVertex.getID().write(out);
 			managementVertex.getGroupVertex().getID().write(out);
 			StringRecord.writeString(out, managementVertex.getInstanceName());
-			StringRecord.writeString(out, managementVertex.getInstanceType());
 			out.writeInt(managementVertex.getIndexInGroup());
 			managementVertex.write(out);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertex.java
index 639b1e9..eaececc 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertex.java
@@ -65,11 +65,6 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
 	private String instanceName;
 
 	/**
-	 * The type of the instance the vertex represented by this management vertex currently runs on.
-	 */
-	private String instanceType;
-
-	/**
 	 * The index of this vertex in the management group vertex it belongs to.
 	 */
 	private final int indexInGroup;
@@ -88,19 +83,14 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
 	 *        the ID of the new management vertex
 	 * @param instanceName
 	 *        the name of the instance the vertex represented by this new management vertex currently runs on
-	 * @param instanceType
-	 *        the type of the instance the vertex represented by this new management vertex currently runs on
-	 * @param checkpointState
-	 *        the state of the vertex's checkpoint
 	 * @param indexInGroup
 	 *        the index of this vertex in the management group vertex it belongs to
 	 */
 	public ManagementVertex(final ManagementGroupVertex groupVertex, final ManagementVertexID id,
-			final String instanceName, final String instanceType, final int indexInGroup) {
+			final String instanceName, final int indexInGroup) {
 		this.groupVertex = groupVertex;
 		this.id = id;
 		this.instanceName = instanceName;
-		this.instanceType = instanceType;
 
 		this.indexInGroup = indexInGroup;
 
@@ -132,15 +122,6 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
 	}
 
 	/**
-	 * Returns the type of the instance the vertex represented by this management vertex currently runs on.
-	 * 
-	 * @return the type of the instance the vertex represented by this management vertex currently runs on
-	 */
-	public String getInstanceType() {
-		return this.instanceType;
-	}
-
-	/**
 	 * Returns the number of input gates this management vertex contains.
 	 * 
 	 * @return the number of input gates this management vertex contains
@@ -276,16 +257,6 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
 		this.instanceName = instanceName;
 	}
 
-	/**
-	 * Sets the type of instance this vertex currently runs on.
-	 * 
-	 * @param instanceType
-	 *        the type of instance this vertex currently runs on
-	 */
-	public void setInstanceType(final String instanceType) {
-		this.instanceType = instanceType;
-	}
-
 	public void setOptMessage(final String optMessage) {
 		this.optMessage = optMessage;
 	}
@@ -294,7 +265,6 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
 		return this.optMessage;
 	}
 
-
 	@Override
 	public void read(final DataInput in) throws IOException {
 
@@ -314,7 +284,6 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
 		}
 
 		this.instanceName = StringRecord.readString(in);
-		this.instanceType = StringRecord.readString(in);
 	}
 
 
@@ -331,7 +300,6 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
 		out.writeInt(this.outputGates.size());
 
 		StringRecord.writeString(out, this.instanceName);
-		StringRecord.writeString(out, this.instanceType);
 	}
 	
 	@Override
@@ -351,7 +319,6 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
 		json.append("\"vertexname\": \"" + StringUtils.escapeHtml(this.toString()) + "\",");
 		json.append("\"vertexstatus\": \"" + this.getExecutionState() + "\",");
 		json.append("\"vertexinstancename\": \"" + this.getInstanceName() + "\",");
-		json.append("\"vertexinstancetype\": \"" + this.getInstanceType() + "\"");
 		json.append("}");
 		return json.toString();
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/net/NetUtils.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/net/NetUtils.java
index 35979dd..cb08c3a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/net/NetUtils.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/net/NetUtils.java
@@ -221,6 +221,7 @@ public class NetUtils {
 	 * @return InputStream for reading from the socket.
 	 * @throws IOException
 	 */
+	@SuppressWarnings("resource")
 	public static InputStream getInputStream(Socket socket, long timeout) throws IOException {
 		return (socket.getChannel() == null) ? socket.getInputStream() : new SocketInputStream(socket, timeout);
 	}
@@ -266,6 +267,7 @@ public class NetUtils {
 	 * @return OutputStream for writing to the socket.
 	 * @throws IOException
 	 */
+	@SuppressWarnings("resource")
 	public static OutputStream getOutputStream(Socket socket, long timeout) throws IOException {
 		return (socket.getChannel() == null) ? socket.getOutputStream() : new SocketOutputStream(socket, timeout);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/JobProfilingData.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/JobProfilingData.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/JobProfilingData.java
index 7d0c980..81b4134 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/JobProfilingData.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/JobProfilingData.java
@@ -23,7 +23,7 @@ import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
 import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
 import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertexIterator;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.instance.AbstractInstance;
+import eu.stratosphere.nephele.instance.Instance;
 import eu.stratosphere.nephele.instance.DummyInstance;
 import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
 import eu.stratosphere.nephele.profiling.impl.types.InternalInstanceProfilingData;
@@ -75,7 +75,7 @@ public class JobProfilingData {
 
 	public InstanceSummaryProfilingEvent getInstanceSummaryProfilingData(long timestamp) {
 
-		final Set<AbstractInstance> tempSet = new HashSet<AbstractInstance>();
+		final Set<Instance> tempSet = new HashSet<Instance>();
 		// First determine the number of allocated instances in the current stage
 		final ExecutionGroupVertexIterator it = new ExecutionGroupVertexIterator(this.executionGraph, true,
 			this.executionGraph.getIndexOfCurrentExecutionStage());
@@ -84,7 +84,7 @@ public class JobProfilingData {
 			final ExecutionGroupVertex groupVertex = it.next();
 			for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); i++) {
 				final ExecutionVertex executionVertex = groupVertex.getGroupMember(i);
-				final AbstractInstance instance = executionVertex.getAllocatedResource().getInstance();
+				final Instance instance = executionVertex.getAllocatedResource().getInstance();
 				if(!(instance instanceof DummyInstance)) {
 					tempSet.add(instance);
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
index 59ec15d..c731285 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
@@ -15,13 +15,10 @@ package eu.stratosphere.nephele.protocols;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 
 import eu.stratosphere.core.io.StringRecord;
 import eu.stratosphere.nephele.event.job.AbstractEvent;
 import eu.stratosphere.nephele.event.job.RecentJobEvent;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.managementgraph.ManagementGraph;
 import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
@@ -104,19 +101,6 @@ public interface ExtendedManagementProtocol extends JobManagementProtocol {
 	void killInstance(StringRecord instanceName) throws IOException;
 
 	/**
-	 * Returns a map of all instance types which are currently available to Nephele. The map contains a description of
-	 * the hardware characteristics for each instance type as provided in the configuration file. Moreover, it contains
-	 * the actual hardware description as reported by task managers running on the individual instances. If available,
-	 * the map also contains the maximum number instances Nephele can allocate of each instance type (i.e. if no other
-	 * job occupies instances).
-	 * 
-	 * @return a list of all instance types available to Nephele
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the list
-	 */
-	Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() throws IOException;
-
-	/**
 	 * Triggers all task managers involved in processing the job with the given job ID to write the utilization of
 	 * their read and write buffers to their log files. This method is primarily for debugging purposes.
 	 * 
@@ -126,4 +110,11 @@ public interface ExtendedManagementProtocol extends JobManagementProtocol {
 	 *         throws if an error occurs while transmitting the request
 	 */
 	void logBufferUtilization(JobID jobID) throws IOException;
+
+	/**
+	 * Returns the number of available slots among the registered task managers
+	 * @return number of available slots
+	 * @throws IOException
+	 */
+	int getAvailableSlots() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
index 8cd5e26..5070b51 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
@@ -19,6 +19,8 @@ import eu.stratosphere.core.protocols.VersionedProtocol;
 import eu.stratosphere.nephele.instance.HardwareDescription;
 import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
 import eu.stratosphere.nephele.taskmanager.TaskExecutionState;
+import eu.stratosphere.nephele.taskmanager.transferenvelope.RegisterTaskManagerResult;
+import eu.stratosphere.nephele.types.IntegerRecord;
 
 /**
  * The job manager protocol is implemented by the job manager and offers functionality
@@ -33,12 +35,23 @@ public interface JobManagerProtocol extends VersionedProtocol {
 	 * 
 	 * @param instanceConnectionInfo
 	 *        the information the job manager requires to connect to the instance's task manager
-	 * @param hardwareDescription
-	 *        a hardware description with details on the instance's compute resources.
 	 * @throws IOException
 	 *         thrown if an error occurs during this remote procedure call
 	 */
-	void sendHeartbeat(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription)
+	void sendHeartbeat(InstanceConnectionInfo instanceConnectionInfo)
+			throws IOException;
+
+	/**
+	 * Registers a task manager at the JobManager.
+	 *
+	 * @param instanceConnectionInfo the information the job manager requires to connect to the instance's task manager
+	 * @param hardwareDescription a hardware description with details on the instance's compute resources.
+	 * @throws IOException
+	 *
+	 * @return whether the task manager was successfully registered
+	 */
+	RegisterTaskManagerResult registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
+						HardwareDescription hardwareDescription,IntegerRecord numberOfSlots)
 			throws IOException;
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/ChannelAccess.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/ChannelAccess.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/ChannelAccess.java
index ccbc64a..85432eb 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/ChannelAccess.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/ChannelAccess.java
@@ -75,6 +75,7 @@ public abstract class ChannelAccess<T, R extends IORequest>
 		this.requestQueue = requestQueue;
 		
 		try {
+			@SuppressWarnings("resource")
 			RandomAccessFile file = new RandomAccessFile(id.getPath(), writeEnabled ? "rw" : "r");
 			this.fileChannel = file.getChannel();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
index a8fe096..8b20c75 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
@@ -69,16 +69,29 @@ public interface MemoryManager {
 	 * @return The size of the pages handled by the memory manager.
 	 */
 	int getPageSize();
+
+	/**
+	 * Returns the total size of memory.
+	 * @return
+	 */
+	long getMemorySize();
 	
 	/**
 	 * Computes to how many pages the given number of bytes corresponds. If the given number of bytes is not an
 	 * exact multiple of a page size, the result is rounded down, such that a portion of the memory (smaller
 	 * than the page size) is not included.
 	 * 
-	 * @param numBytes The number of bytes to convert to a page count.
+	 * @param fraction the fraction of the total memory per slot
 	 * @return The number of pages to which 
 	 */
-	int computeNumberOfPages(long numBytes);
+	int computeNumberOfPages(double fraction);
+
+	/**
+	 * Computes the memory size of the fraction per slot.
+	 * @param fraction
+	 * @return
+	 */
+	long computeMemorySize(double fraction);
 	
 	/**
 	 * Rounds the given value down to a multiple of the memory manager's page size.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
index 8bc7b13..d4a2b36 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
@@ -66,6 +66,13 @@ public class DefaultMemoryManager implements MemoryManager {
 	
 	private boolean isShutDown;				// flag whether the close() has already been invoked.
 
+	/**
+	 * Number of slots of the task manager
+	 */
+	private final int numberOfSlots;
+
+	private final long memorySize;
+
 	// ------------------------------------------------------------------------
 	// Constructors / Destructors
 	// ------------------------------------------------------------------------
@@ -75,8 +82,8 @@ public class DefaultMemoryManager implements MemoryManager {
 	 * 
 	 * @param memorySize The total size of the memory to be managed by this memory manager.
 	 */
-	public DefaultMemoryManager(long memorySize) {
-		this(memorySize, DEFAULT_PAGE_SIZE);
+	public DefaultMemoryManager(long memorySize, int numberOfSlots) {
+		this(memorySize, numberOfSlots, DEFAULT_PAGE_SIZE);
 	}
 
 	/**
@@ -85,7 +92,7 @@ public class DefaultMemoryManager implements MemoryManager {
 	 * @param memorySize The total size of the memory to be managed by this memory manager.
 	 * @param pageSize The size of the pages handed out by the memory manager.
 	 */
-	public DefaultMemoryManager(long memorySize, int pageSize) {
+	public DefaultMemoryManager(long memorySize, int numberOfSlots, int pageSize) {
 		// sanity checks
 		if (memorySize <= 0) {
 			throw new IllegalArgumentException("Size of total memory must be positive.");
@@ -97,6 +104,10 @@ public class DefaultMemoryManager implements MemoryManager {
 			// not a power of two
 			throw new IllegalArgumentException("The given page size is not a power of two.");
 		}
+
+		this.memorySize = memorySize;
+
+		this.numberOfSlots = numberOfSlots;
 		
 		// assign page size and bit utilities
 		this.pageSize = pageSize;
@@ -348,8 +359,18 @@ public class DefaultMemoryManager implements MemoryManager {
 	}
 
 	@Override
-	public int computeNumberOfPages(long numBytes) {
-		return getNumPages(numBytes);
+	public long getMemorySize() {
+		return this.memorySize;
+	}
+
+	@Override
+	public int computeNumberOfPages(double fraction) {
+		return getRelativeNumPages(fraction);
+	}
+
+	@Override
+	public long computeMemorySize(double fraction) {
+		return this.pageSize*computeNumberOfPages(fraction);
 	}
 
 	@Override
@@ -371,6 +392,14 @@ public class DefaultMemoryManager implements MemoryManager {
 			throw new IllegalArgumentException("The given number of bytes correstponds to more than MAX_INT pages.");
 		}
 	}
+
+	private final int getRelativeNumPages(double fraction){
+		if(fraction < 0){
+			throw new IllegalArgumentException("The fraction of memory to allocate must not be negative.");
+		}
+
+		return (int)(this.totalNumPages * fraction / this.numberOfSlots);
+	}
 	
 	// ------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index ef0f6ab..5966cf9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -45,6 +45,10 @@ import eu.stratosphere.nephele.ExecutionMode;
 import eu.stratosphere.runtime.io.network.LocalConnectionManager;
 import eu.stratosphere.runtime.io.network.NetworkConnectionManager;
 import eu.stratosphere.runtime.io.network.netty.NettyConnectionManager;
+import eu.stratosphere.nephele.instance.Hardware;
+import eu.stratosphere.nephele.taskmanager.transferenvelope.RegisterTaskManagerResult;
+import eu.stratosphere.nephele.types.IntegerRecord;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
@@ -148,7 +152,9 @@ public class TaskManager implements TaskOperationProtocol {
 
 	private final IOManager ioManager;
 
-	private static HardwareDescription hardwareDescription = null;
+	private final HardwareDescription hardwareDescription;
+
+	private final int numberOfSlots;
 
 	private final Thread heartbeatThread;
 	
@@ -156,10 +162,10 @@ public class TaskManager implements TaskOperationProtocol {
 	
 	/** Stores whether the task manager has already been shut down. */
 	private volatile boolean shutdownComplete;
-
+	
 	/**
 	 * Constructs a new task manager, starts its IPC service and attempts to discover the job manager to
-	 * receive an initial configuration. All parameters are obtained from the
+	 * receive an initial configuration. All parameters are obtained from the 
 	 * {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager.
 	 */
 	public TaskManager(ExecutionMode executionMode) throws Exception {
@@ -169,30 +175,31 @@ public class TaskManager implements TaskOperationProtocol {
 		LOG.info("Execution mode: " + executionMode);
 
 		// IMPORTANT! At this point, the GlobalConfiguration must have been read!
-
+		
 		final InetSocketAddress jobManagerAddress;
 		{
 			LOG.info("Reading location of job manager from configuration");
-
+			
 			final String address = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
 			final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
-
+			
 			if (address == null) {
 				throw new Exception("Job manager address not configured in the GlobalConfiguration.");
 			}
-
+	
 			// Try to convert configured address to {@link InetAddress}
 			try {
 				final InetAddress tmpAddress = InetAddress.getByName(address);
 				jobManagerAddress = new InetSocketAddress(tmpAddress, port);
-			} catch (UnknownHostException e) {
+			}
+			catch (UnknownHostException e) {
 				LOG.fatal("Could not resolve JobManager host name.");
 				throw new Exception("Could not resolve JobManager host name: " + e.getMessage(), e);
 			}
-
+			
 			LOG.info("Connecting to JobManager at: " + jobManagerAddress);
 		}
-
+		
 		// Create RPC connection to the JobManager
 		try {
 			this.jobManager = RPC.getProxy(JobManagerProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
@@ -200,7 +207,7 @@ public class TaskManager implements TaskOperationProtocol {
 			LOG.fatal("Could not connect to the JobManager: " + e.getMessage(), e);
 			throw new Exception("Failed to initialize connection to JobManager: " + e.getMessage(), e);
 		}
-
+		
 		int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, -1);
 		int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, -1);
 		if (ipcPort == -1) {
@@ -209,16 +216,17 @@ public class TaskManager implements TaskOperationProtocol {
 		if (dataPort == -1) {
 			dataPort = getAvailablePort();
 		}
-
+		
 		// Determine our own public facing address and start the server
 		{
 			final InetAddress taskManagerAddress;
 			try {
 				taskManagerAddress = getTaskManagerAddress(jobManagerAddress);
-			} catch (Exception e) {
+			}
+			catch (Exception e) {
 				throw new RuntimeException("The TaskManager failed to determine its own network address.", e);
 			}
-
+			
 			this.localInstanceConnectionInfo = new InstanceConnectionInfo(taskManagerAddress, ipcPort, dataPort);
 			LOG.info("TaskManager connection information:" + this.localInstanceConnectionInfo);
 
@@ -231,7 +239,7 @@ public class TaskManager implements TaskOperationProtocol {
 				throw new Exception("Failed to start taskmanager server. " + e.getMessage(), e);
 			}
 		}
-
+		
 		// Try to create local stub of the global input split provider
 		try {
 			this.globalInputSplitProvider = RPC.getProxy(InputSplitProviderProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
@@ -258,21 +266,19 @@ public class TaskManager implements TaskOperationProtocol {
 
 		// Load profiler if it should be used
 		if (GlobalConfiguration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) {
-
+			
 			final String profilerClassName = GlobalConfiguration.getString(ProfilingUtils.TASKMANAGER_CLASSNAME_KEY,
-					"eu.stratosphere.nephele.profiling.impl.TaskManagerProfilerImpl");
-
+				"eu.stratosphere.nephele.profiling.impl.TaskManagerProfilerImpl");
+			
 			this.profiler = ProfilingUtils.loadTaskManagerProfiler(profilerClassName, jobManagerAddress.getAddress(),
-					this.localInstanceConnectionInfo);
-
+				this.localInstanceConnectionInfo);
+			
 			if (this.profiler == null) {
 				LOG.error("Cannot find class name for the profiler.");
-			}
-			else {
+			} else {
 				LOG.info("Profiling of jobs is enabled.");
 			}
-		}
-		else {
+		} else {
 			this.profiler = null;
 			LOG.info("Profiling of jobs is disabled.");
 		}
@@ -282,10 +288,11 @@ public class TaskManager implements TaskOperationProtocol {
 				ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
 
 		checkTempDirs(tmpDirPaths);
-
+		
 		final int pageSize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
-				ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
+			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
 
+		// Initialize network buffer pool
 		int numBuffers = GlobalConfiguration.getInteger(
 				ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
 				ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
@@ -333,6 +340,8 @@ public class TaskManager implements TaskOperationProtocol {
 
 		{
 			HardwareDescription resources = HardwareDescriptionFactory.extractFromSystem();
+			numberOfSlots = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+					Hardware.getNumberCPUCores());
 
 			// Check whether the memory size has been explicitly configured. if so that overrides the default mechanism
 			// of taking as much as is mentioned in the hardware description
@@ -341,29 +350,30 @@ public class TaskManager implements TaskOperationProtocol {
 			if (memorySize > 0) {
 				// manually configured memory size. override the value in the hardware config
 				resources = HardwareDescriptionFactory.construct(resources.getNumberOfCPUCores(),
-						resources.getSizeOfPhysicalMemory(), memorySize * 1024L * 1024L);
+					resources.getSizeOfPhysicalMemory(), memorySize * 1024L * 1024L);
 			}
 			this.hardwareDescription = resources;
 
 			// Initialize the memory manager
 			LOG.info("Initializing memory manager with " + (resources.getSizeOfFreeMemory() >>> 20) + " megabytes of memory. " +
 					"Page size is " + pageSize + " bytes.");
-
+			
 			try {
 				@SuppressWarnings("unused")
 				final boolean lazyAllocation = GlobalConfiguration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY,
-						ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION);
-
-				this.memoryManager = new DefaultMemoryManager(resources.getSizeOfFreeMemory(), pageSize);
+					ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION);
+				
+				this.memoryManager = new DefaultMemoryManager(resources.getSizeOfFreeMemory(), this.numberOfSlots,
+						pageSize);
 			} catch (Throwable t) {
 				LOG.fatal("Unable to initialize memory manager with " + (resources.getSizeOfFreeMemory() >>> 20)
-						+ " megabytes of memory.", t);
+					+ " megabytes of memory.", t);
 				throw new Exception("Unable to initialize memory manager.", t);
 			}
 		}
 
 		this.ioManager = new IOManager(tmpDirPaths);
-
+		
 		this.heartbeatThread = new Thread() {
 			@Override
 			public void run() {
@@ -510,19 +520,33 @@ public class TaskManager implements TaskOperationProtocol {
 						ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY,
 						ConfigConstants.DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL);
 
-		while (!shutdownStarted.get()) {
-			// send heart beat
-			try {
-				LOG.debug("heartbeat");
-				this.jobManager.sendHeartbeat(this.localInstanceConnectionInfo, this.hardwareDescription);
-			} catch (IOException e) {
-				if (shutdownStarted.get()) {
+		try {
+			while(!shutdownStarted.get()){
+				RegisterTaskManagerResult result  = this.jobManager.registerTaskManager(this
+								.localInstanceConnectionInfo,this.hardwareDescription,
+						new IntegerRecord(this.numberOfSlots));
+
+				if(result.getReturnCode() == RegisterTaskManagerResult.ReturnCode.SUCCESS){
 					break;
-				} else {
-					LOG.error("Sending the heart beat caused an exception: " + e.getMessage(), e);
+				}
+
+				try{
+					Thread.sleep(50);
+				}catch(InterruptedException e){
+					if (!shutdownStarted.get()) {
+						LOG.error("TaskManager register task manager loop was interrupted without shutdown.");
+					}
 				}
 			}
-			
+
+		} catch (IOException e) {
+			if(!shutdownStarted.get()){
+				LOG.error("Registering task manager caused an exception: " + e.getMessage(), e);
+			}
+			return;
+		}
+
+		while (!shutdownStarted.get()) {
 			// sleep until the next heart beat
 			try {
 				Thread.sleep(interval);
@@ -532,9 +556,22 @@ public class TaskManager implements TaskOperationProtocol {
 					LOG.error("TaskManager heart beat loop was interrupted without shutdown.");
 				}
 			}
+
+			// send heart beat
+			try {
+				LOG.debug("heartbeat");
+				this.jobManager.sendHeartbeat(this.localInstanceConnectionInfo);
+			} catch (IOException e) {
+				if (shutdownStarted.get()) {
+					break;
+				} else {
+					LOG.error("Sending the heart beat caused an exception: " + e.getMessage(), e);
+				}
+			}
 		}
 	}
 
+	
 	/**
 	 * The states of address detection mechanism.
 	 * There is only a state transition if the current state failed to determine the address.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/RegisterTaskManagerResult.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/RegisterTaskManagerResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/RegisterTaskManagerResult.java
new file mode 100644
index 0000000..b396edd
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/RegisterTaskManagerResult.java
@@ -0,0 +1,50 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.taskmanager.transferenvelope;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.util.EnumUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class RegisterTaskManagerResult implements IOReadableWritable {
+	public enum ReturnCode{
+		SUCCESS, FAILURE
+	};
+
+	public RegisterTaskManagerResult(){
+		this.returnCode = ReturnCode.SUCCESS;
+	}
+
+	public RegisterTaskManagerResult(ReturnCode returnCode){
+		this.returnCode = returnCode;
+	}
+
+	private ReturnCode returnCode;
+
+	public ReturnCode getReturnCode() { return this.returnCode; }
+
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		EnumUtils.writeEnum(out, this.returnCode);
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		this.returnCode = EnumUtils.readEnum(in, ReturnCode.class);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/topology/NetworkNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/topology/NetworkNode.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/topology/NetworkNode.java
index 09df691..9f6542b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/topology/NetworkNode.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/topology/NetworkNode.java
@@ -33,8 +33,6 @@ public class NetworkNode implements IOReadableWritable {
 
 	private final List<NetworkNode> childNodes = new ArrayList<NetworkNode>();
 
-	private Object attachment;
-
 	protected NetworkNode(final String name, final NetworkNode parentNode, final NetworkTopology networkTopology) {
 		this.name = name;
 		this.parentNode = parentNode;
@@ -119,14 +117,6 @@ public class NetworkNode implements IOReadableWritable {
 		return this.childNodes.size();
 	}
 
-	public void setAttachment(final Object attachment) {
-		this.attachment = attachment;
-	}
-
-	public Object getAttachment() {
-		return this.attachment;
-	}
-
 	public NetworkNode getChildNode(final int index) {
 
 		if (index < this.childNodes.size()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/IOUtils.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/IOUtils.java
index 0ca490b..554bac5 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/IOUtils.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/IOUtils.java
@@ -56,6 +56,7 @@ public final class IOUtils {
 	public static void copyBytes(final InputStream in, final OutputStream out, final int buffSize, final boolean close)
 			throws IOException {
 
+		@SuppressWarnings("resource")
 		final PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
 		final byte[] buf = new byte[buffSize];
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
index e4f0a4b..fe63ebe 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
@@ -14,7 +14,6 @@
 package eu.stratosphere.pact.runtime.cache;
 
 import eu.stratosphere.api.common.cache.DistributedCache;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
@@ -138,14 +137,12 @@ public class FileCache {
 	 * Asynchronous file copy process
 	 */
 	private class CopyProcess implements Callable<Path> {
+		
 		private JobID jobID;
-		@SuppressWarnings("unused")
-		private String name;
 		private String filePath;
 		private Boolean executable;
 
 		public CopyProcess(String name, DistributedCacheEntry e, JobID jobID) {
-			this.name = name;
 			this.filePath = e.filePath;
 			this.executable = e.isExecutable;
 			this.jobID = jobID;
@@ -168,15 +165,13 @@ public class FileCache {
 	 * If no task is using this file after 5 seconds, clear it.
 	 */
 	private class DeleteProcess implements Runnable {
+		
 		private String name;
-		@SuppressWarnings("unused")
-		private String filePath;
 		private JobID jobID;
 		private int oldCount;
 
 		public DeleteProcess(String name, DistributedCacheEntry e, JobID jobID, int c) {
 			this.name = name;
-			this.filePath = e.filePath;
 			this.jobID = jobID;
 			this.oldCount = c;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstHashMatchIterator.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstHashMatchIterator.java
index ddfa446..a060d28 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstHashMatchIterator.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstHashMatchIterator.java
@@ -60,7 +60,7 @@ public class BuildFirstHashMatchIterator<V1, V2, O> implements JoinTaskIterator<
 			TypeSerializer<V1> serializer1, TypeComparator<V1> comparator1,
 			TypeSerializer<V2> serializer2, TypeComparator<V2> comparator2,
 			TypePairComparator<V2, V1> pairComparator,
-			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, long totalMemory)
+			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
 	throws MemoryAllocationException
 	{		
 		this.memManager = memManager;
@@ -73,7 +73,7 @@ public class BuildFirstHashMatchIterator<V1, V2, O> implements JoinTaskIterator<
 		this.probeCopy = serializer2.createInstance();
 		
 		this.hashJoin = getHashJoin(serializer1, comparator1, serializer2, comparator2, pairComparator,
-			memManager, ioManager, ownerTask, totalMemory);
+			memManager, ioManager, ownerTask, memoryFraction);
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -152,10 +152,10 @@ public class BuildFirstHashMatchIterator<V1, V2, O> implements JoinTaskIterator<
 	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
 			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
 			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, long totalMemory)
+			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
 	throws MemoryAllocationException
 	{
-		final int numPages = memManager.computeNumberOfPages(totalMemory);
+		final int numPages = memManager.computeNumberOfPages(memoryFraction);
 		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
 		return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstReOpenableHashMatchIterator.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstReOpenableHashMatchIterator.java
index d699462..8c2b9ca 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstReOpenableHashMatchIterator.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstReOpenableHashMatchIterator.java
@@ -38,21 +38,21 @@ public class BuildFirstReOpenableHashMatchIterator<V1, V2, O> extends BuildFirst
 			TypeSerializer<V2> serializer2, TypeComparator<V2> comparator2,
 			TypePairComparator<V2, V1> pairComparator,
 			MemoryManager memManager, IOManager ioManager,
-			AbstractInvokable ownerTask, long totalMemory)
+			AbstractInvokable ownerTask, double memoryFraction)
 			throws MemoryAllocationException {
 		super(firstInput, secondInput, serializer1, comparator1, serializer2,
 				comparator2, pairComparator, memManager, ioManager, ownerTask,
-				totalMemory);
+				memoryFraction);
 		reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
 	}
 
 	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
 			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
 			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, long totalMemory)
+			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
 	throws MemoryAllocationException
 	{
-		final int numPages = memManager.computeNumberOfPages(totalMemory);
+		final int numPages = memManager.computeNumberOfPages(memoryFraction);
 		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
 		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
 	}