You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/10 21:35:17 UTC

[20/34] Offer buffer-oriented API for I/O (#25)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
index eba81a2..846ca2e 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
@@ -75,7 +75,7 @@ import eu.stratosphere.nephele.instance.InstanceManager;
 import eu.stratosphere.nephele.instance.InstanceType;
 import eu.stratosphere.nephele.instance.InstanceTypeDescription;
 import eu.stratosphere.nephele.instance.local.LocalInstanceManager;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
 import eu.stratosphere.nephele.ipc.RPC;
 import eu.stratosphere.nephele.ipc.Server;
 import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
@@ -90,8 +90,6 @@ import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitManager;
 import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitWrapper;
 import eu.stratosphere.nephele.jobmanager.web.WebInfoServer;
 import eu.stratosphere.nephele.managementgraph.ManagementGraph;
-import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
-import eu.stratosphere.nephele.multicast.MulticastManager;
 import eu.stratosphere.nephele.profiling.JobManagerProfiler;
 import eu.stratosphere.nephele.profiling.ProfilingUtils;
 import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
@@ -103,11 +101,10 @@ import eu.stratosphere.nephele.services.accumulators.AccumulatorEvent;
 import eu.stratosphere.nephele.taskmanager.AbstractTaskResult;
 import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
 import eu.stratosphere.nephele.taskmanager.TaskExecutionState;
-import eu.stratosphere.nephele.taskmanager.TaskKillResult;
 import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.ConnectionInfoLookupResponse;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.RemoteReceiver;
-import eu.stratosphere.nephele.taskmanager.runtime.ExecutorThreadFactory;
+import eu.stratosphere.runtime.io.network.ConnectionInfoLookupResponse;
+import eu.stratosphere.runtime.io.network.RemoteReceiver;
+import eu.stratosphere.nephele.taskmanager.ExecutorThreadFactory;
 import eu.stratosphere.nephele.topology.NetworkTopology;
 import eu.stratosphere.nephele.types.IntegerRecord;
 import eu.stratosphere.nephele.util.SerializableArrayList;
@@ -141,8 +138,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 	private final InputSplitManager inputSplitManager;
 
 	private final AbstractScheduler scheduler;
-
-	private final MulticastManager multicastManager;
 	
 	private AccumulatorManager accumulatorManager;
 
@@ -246,9 +241,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			throw new Exception("Unable to load scheduler " + schedulerClassName);
 		}
 
-		// Create multicastManager
-		this.multicastManager = new MulticastManager(this.scheduler);
-
 		// Load profiler if it should be used
 		if (GlobalConfiguration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) {
 			final String profilerClassName = GlobalConfiguration.getString(ProfilingUtils.JOBMANAGER_CLASSNAME_KEY,
@@ -732,8 +724,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 
 
 	@Override
-	public ConnectionInfoLookupResponse lookupConnectionInfo(final InstanceConnectionInfo caller, final JobID jobID,
-			final ChannelID sourceChannelID) {
+	public ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo caller, JobID jobID, ChannelID sourceChannelID) {
 
 		final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
 		if (eg == null) {
@@ -754,7 +745,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 
 		if (sourceChannelID.equals(edge.getInputChannelID())) {
 			// Request was sent from an input channel
-
 			final ExecutionVertex connectedVertex = edge.getOutputGate().getVertex();
 
 			final AbstractInstance assignedInstance = connectedVertex.getAllocatedResource().getInstance();
@@ -768,9 +758,11 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			// Check execution state
 			final ExecutionState executionState = connectedVertex.getExecutionState();
 			if (executionState == ExecutionState.FINISHED) {
-				return ConnectionInfoLookupResponse.createReceiverFoundAndReady();
+				// that should not happen. if there is data pending, the receiver cannot be ready
+				return ConnectionInfoLookupResponse.createReceiverNotFound();
 			}
 
+			// running is common, finishing is happens when the lookup is for the close event
 			if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING) {
 				// LOG.info("Created receiverNotReady for " + connectedVertex + " in state " + executionState + " 2");
 				return ConnectionInfoLookupResponse.createReceiverNotReady();
@@ -781,74 +773,53 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getOutputChannelID());
 			} else {
 				// Receiver runs on a different task manager
-
 				final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
-				final InetSocketAddress isa = new InetSocketAddress(ici.getAddress(), ici.getDataPort());
+				final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
 
-				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge
-					.getConnectionID()));
+				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
 			}
 		}
+		// else, the request is for an output channel
+		// Find vertex of connected input channel
+		final ExecutionVertex targetVertex = edge.getInputGate().getVertex();
 
-		if (edge.isBroadcast()) {
+		// Check execution state
+		final ExecutionState executionState = targetVertex.getExecutionState();
 
-			return multicastManager.lookupConnectionInfo(caller, jobID, sourceChannelID);
+		// check whether the task needs to be deployed
+		if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING && executionState != ExecutionState.FINISHED) {
 
-		} else {
-
-			// Find vertex of connected input channel
-			final ExecutionVertex targetVertex = edge.getInputGate().getVertex();
-
-			// Check execution state
-			final ExecutionState executionState = targetVertex.getExecutionState();
-
-			if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING
-					&& executionState != ExecutionState.FINISHED) {
-
-				if (executionState == ExecutionState.ASSIGNED) {
-
-					final Runnable command = new Runnable() {
-
-						/**
-						 * {@inheritDoc}
-						 */
-						@Override
-						public void run() {
-							scheduler.deployAssignedVertices(targetVertex);
-						}
-					};
-
-					eg.executeCommand(command);
-				}
-
-				// LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 3");
-				return ConnectionInfoLookupResponse.createReceiverNotReady();
-			}
-
-			final AbstractInstance assignedInstance = targetVertex.getAllocatedResource().getInstance();
-			if (assignedInstance == null) {
-				LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getInputChannelID()
-					+ " but no instance assigned");
-				// LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 4");
-				return ConnectionInfoLookupResponse.createReceiverNotReady();
+			if (executionState == ExecutionState.ASSIGNED) {
+				final Runnable command = new Runnable() {
+					@Override
+					public void run() {
+						scheduler.deployAssignedVertices(targetVertex);
+					}
+				};
+				eg.executeCommand(command);
 			}
 
-			if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
-				// Receiver runs on the same task manager
-				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getInputChannelID());
-			} else {
-				// Receiver runs on a different task manager
-				final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
-				final InetSocketAddress isa = new InetSocketAddress(ici.getAddress(), ici.getDataPort());
+			// LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 3");
+			return ConnectionInfoLookupResponse.createReceiverNotReady();
+		}
 
-				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge
-					.getConnectionID()));
-			}
+		final AbstractInstance assignedInstance = targetVertex.getAllocatedResource().getInstance();
+		if (assignedInstance == null) {
+			LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getInputChannelID() + " but no instance assigned");
+			// LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 4");
+			return ConnectionInfoLookupResponse.createReceiverNotReady();
 		}
 
-		// LOG.error("Receiver(s) not found");
+		if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
+			// Receiver runs on the same task manager
+			return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getInputChannelID());
+		} else {
+			// Receiver runs on a different task manager
+			final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
+			final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
 
-		// return ConnectionInfoLookupResponse.createReceiverNotFound();
+			return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
+		}
 	}
 
 	/**
@@ -921,40 +892,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		return eventList;
 	}
 
-
-	@Override
-	public void killTask(final JobID jobID, final ManagementVertexID id) throws IOException {
-
-		final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
-		if (eg == null) {
-			LOG.error("Cannot find execution graph for job " + jobID);
-			return;
-		}
-
-		final ExecutionVertex vertex = eg.getVertexByID(ExecutionVertexID.fromManagementVertexID(id));
-		if (vertex == null) {
-			LOG.error("Cannot find execution vertex with ID " + id);
-			return;
-		}
-
-		LOG.info("Killing task " + vertex + " of job " + jobID);
-
-		final Runnable runnable = new Runnable() {
-
-			@Override
-			public void run() {
-
-				final TaskKillResult result = vertex.killTask();
-				if (result.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) {
-					LOG.error(result.getDescription());
-				}
-			}
-		};
-
-		eg.executeCommand(runnable);
-	}
-
-
 	@Override
 	public void killInstance(final StringRecord instanceName) throws IOException {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java
index c9e02d3..24e2970 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java
@@ -210,7 +210,7 @@ public abstract class AbstractScheduler implements InstanceListener {
 			case NETWORK:
 				deployTarget = false;
 				break;
-			case INMEMORY:
+			case IN_MEMORY:
 				deployTarget = true;
 				break;
 			default:

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
index efbaf93..762b494 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
@@ -32,8 +32,7 @@ import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
 import eu.stratosphere.nephele.instance.AbstractInstance;
 import eu.stratosphere.nephele.instance.DummyInstance;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.taskmanager.AbstractTaskResult.ReturnCode;
+import eu.stratosphere.runtime.io.channels.ChannelID;
 import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
 import eu.stratosphere.nephele.util.SerializableHashSet;
 import eu.stratosphere.util.StringUtils;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdge.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdge.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdge.java
index 8ecd99f..58486da 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdge.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdge.java
@@ -13,7 +13,7 @@
 
 package eu.stratosphere.nephele.managementgraph;
 
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 
 /**
  * This class implements a directed edge of a {@link ManagementGraph}. The edge is derived from a channel of the actual

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdgeID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdgeID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdgeID.java
index 20107a7..d263999 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdgeID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdgeID.java
@@ -13,8 +13,8 @@
 
 package eu.stratosphere.nephele.managementgraph;
 
-import eu.stratosphere.nephele.io.AbstractID;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.nephele.AbstractID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
 
 /**
  * A management edge ID uniquely identifies a {@link ManagementEdge}.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGateID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGateID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGateID.java
index 73548be..63aa6f7 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGateID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGateID.java
@@ -13,7 +13,7 @@
 
 package eu.stratosphere.nephele.managementgraph;
 
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
 
 /**
  * A management gate ID uniquely identifies a {@link ManagementGate}.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
index f4a9855..374656b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
@@ -28,7 +28,7 @@ import java.util.Map;
 
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.util.EnumUtils;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupEdge.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupEdge.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupEdge.java
index 01a0903..7e6d554 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupEdge.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupEdge.java
@@ -13,7 +13,7 @@
 
 package eu.stratosphere.nephele.managementgraph;
 
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 
 /**
  * This class implements a directed edge of between two {@link ManagementGroupVertex} objects. The edge is derived from

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertex.java
index bbf5fd2..b98a153 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertex.java
@@ -24,7 +24,7 @@ import java.util.Map;
 
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.util.EnumUtils;
 import eu.stratosphere.util.StringUtils;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertexID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertexID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertexID.java
index b5c1055..aef64af 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertexID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertexID.java
@@ -15,7 +15,7 @@ package eu.stratosphere.nephele.managementgraph;
 
 import javax.xml.bind.DatatypeConverter;
 
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
 
 /**
  * A management group vertex ID uniquely identifies a {@link ManagementGroupVertex}.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertexID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertexID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertexID.java
index 716ae7f..486b3fa 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertexID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertexID.java
@@ -13,7 +13,7 @@
 
 package eu.stratosphere.nephele.managementgraph;
 
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
 
 /**
  * A management vertex ID uniquely identifies a {@link ManagementVertex}.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastCluster.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastCluster.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastCluster.java
deleted file mode 100644
index 87e0181..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastCluster.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.multicast;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-/**
- * This class represents a cluster of hosts within a multicast-tree.
- * 
- */
-
-public class MulticastCluster {
-	TreeNode master = null;
-
-	HashSet<TreeNode> clusternodes = new HashSet<TreeNode>();
-
-	public void addNode(TreeNode node) {
-		this.clusternodes.add(node);
-	}
-
-	public int getSize() {
-		return this.clusternodes.size();
-	}
-
-	public HashSet<TreeNode> getNodes() {
-		return this.clusternodes;
-	}
-
-	/**
-	 * Returns the master-node of the current cluster.
-	 * 
-	 * @return
-	 */
-	public TreeNode getMaster() {
-
-		if (this.master == null) {
-
-			// TODO: topology-aware!!
-			if (clusternodes.size() != 0) {
-				this.master = clusternodes.iterator().next();
-			} else {
-				System.out.println("cluster is empty.");
-				return null;
-			}
-
-		}
-
-		return this.master;
-	}
-
-	/**
-	 * Splits the cluster into an arbitrary number of clusters not exceeding maxsize.
-	 * 
-	 * @param maxsize
-	 * @return
-	 */
-	public HashSet<MulticastCluster> splitCluster(int maxsize) {
-		// TODO: topology-aware!
-		HashSet<MulticastCluster> newClusters = new HashSet<MulticastCluster>();
-
-		MulticastCluster actualcluster = new MulticastCluster();
-
-		for (Iterator<TreeNode> i = this.clusternodes.iterator(); i.hasNext();) {
-			if (actualcluster.getSize() < maxsize) {
-				actualcluster.addNode(i.next());
-			} else {
-				// cluster is full.. add old cluster to list
-				newClusters.add(actualcluster);
-				// and create new cluster object
-				actualcluster = new MulticastCluster();
-				actualcluster.addNode(i.next());
-			}
-		}
-
-		newClusters.add(actualcluster);
-
-		return newClusters;
-
-	}
-
-	public static MulticastCluster createInitialCluster(Collection<TreeNode> nodes) {
-		// TODO: topology-aware? in here?
-
-		MulticastCluster cluster = new MulticastCluster();
-
-		for (TreeNode n : nodes) {
-			cluster.addNode(n);
-		}
-
-		return cluster;
-
-	}
-
-	public static MulticastForwardingTable createClusteredTree(LinkedList<TreeNode> nodes, int maxclustersize) {
-
-		return null;
-		/*
-		// List to store all levels of the clustered multicast tree
-		LinkedList<HashSet<MulticastCluster>> clusterlist = new LinkedList<HashSet<MulticastCluster>>();
-
-		// Poll off the sending node first..
-		TreeNode source = nodes.pollFirst();
-
-		// Create an initital multicast cluster containing all receivers
-		MulticastCluster initialcluster = createInitialCluster(nodes);
-
-		// Create a first layer of clusters with arbitrary size by splitting the initital cluster
-		HashSet<MulticastCluster> firstlaycluster = initialcluster.splitCluster(maxclustersize);
-
-		// add to the list of cluster layers
-		clusterlist.add(firstlaycluster);
-
-		// we want the top layer to consist of max. maxclustersize clusters
-		while (clusterlist.getFirst().size() > maxclustersize) {
-
-			// creating a new cluster-layer...
-			MulticastCluster nextlayercluster = new MulticastCluster();
-
-			HashSet<MulticastCluster> lowerlayer = clusterlist.getFirst();
-
-			// add all master nodes from current layer to next-layer cluster...
-			for (MulticastCluster c : lowerlayer) {
-				nextlayercluster.addNode(c.getMaster());
-			}
-
-			// if our next-layer cluster is still too big, we need to split it again
-			HashSet<MulticastCluster> nextlayerclusters = nextlayercluster.splitCluster(maxclustersize);
-
-			// and finally ad the new layer of clusters
-			clusterlist.addFirst(nextlayerclusters);
-
-		}
-
-		// now we can create the tree...
-
-		MulticastForwardingTable table = new MulticastForwardingTable();
-
-		HashSet<MulticastCluster> initialclusterlevel = clusterlist.getFirst();
-
-		ConnectionInfoLookupResponse sourceentry = ConnectionInfoLookupResponse.createReceiverFoundAndReady();
-
-		// add all local targets
-		for (ChannelID id : source.getLocalTargets()) {
-			System.out.println("local target: " + id);
-			sourceentry.addLocalTarget(id);
-		}
-
-		// connect source node with all master nodes in top-level clusters
-		for (MulticastCluster c : initialclusterlevel) {
-			sourceentry.addRemoteTarget(c.getMaster().getConnectionInfo());
-		}
-
-		table.addConnectionInfo(source.getConnectionInfo(), sourceentry);
-		System.out.println("forwards for node: " + source.getConnectionInfo());
-		System.out.println(sourceentry);
-		// now we have connected the source node to the initial cluster layer. iterate through cluster layers and
-		// connect
-
-		while (clusterlist.size() > 0) {
-			HashSet<MulticastCluster> actualclusterlevel = clusterlist.pollFirst();
-
-			// add remote targets!
-
-			for (MulticastCluster c : actualclusterlevel) {
-				TreeNode master = c.getMaster();
-				for (Iterator<TreeNode> i = c.getNodes().iterator(); i.hasNext();) {
-					TreeNode actualnode = i.next();
-					if (!actualnode.equals(master)) {
-						// add remote target at master of current cluster
-						master.addRemoteTarget(actualnode.getConnectionInfo());
-					}
-				}
-			}
-
-		}
-
-		// now iterate through all nodes and create forwarding table...
-		// we already have the entry for the source node..
-		for (TreeNode n : nodes) {
-			ConnectionInfoLookupResponse actualentry = ConnectionInfoLookupResponse.createReceiverFoundAndReady();
-			for (ChannelID localTarget : n.getLocalTargets()) {
-				actualentry.addLocalTarget(localTarget);
-			}
-			for (InstanceConnectionInfo remoteTarget : n.getRemoteTargets()) {
-				actualentry.addRemoteTarget(remoteTarget);
-			}
-			table.addConnectionInfo(n.getConnectionInfo(), actualentry);
-			System.out.println("forwards for node: " + n.getConnectionInfo());
-			System.out.println(actualentry);
-		}
-
-		return table;
-		*/
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastForwardingTable.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastForwardingTable.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastForwardingTable.java
deleted file mode 100644
index d08821e..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastForwardingTable.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.multicast;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.ConnectionInfoLookupResponse;
-
-/**
- * This class contains ConnectionInfoLookupResponse objects containing local, as well as remote receivers for all
- * instances within a certain job-specific multicast tree.
- * 
- */
-public class MulticastForwardingTable {
-
-	private final Map<InstanceConnectionInfo, ConnectionInfoLookupResponse> forwardingTable = new HashMap<InstanceConnectionInfo, ConnectionInfoLookupResponse>();
-
-	/**
-	 * Returns the related ConnectionInfoLookupResponse for the calling Instance.
-	 * 
-	 * @param caller
-	 * @return
-	 */
-	public ConnectionInfoLookupResponse getConnectionInfo(InstanceConnectionInfo caller) {
-		if (this.forwardingTable.containsKey(caller)) {
-			return this.forwardingTable.get(caller);
-		} else {
-			return null;
-		}
-	}
-
-	protected void addConnectionInfo(InstanceConnectionInfo caller, ConnectionInfoLookupResponse response) {
-		this.forwardingTable.put(caller, response);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastManager.java
deleted file mode 100644
index 0a7b471..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastManager.java
+++ /dev/null
@@ -1,463 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.multicast;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.executiongraph.ExecutionEdge;
-import eu.stratosphere.nephele.executiongraph.ExecutionGate;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.jobmanager.JobManager;
-import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
-import eu.stratosphere.nephele.protocols.ChannelLookupProtocol;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.ConnectionInfoLookupResponse;
-
-/**
- * The MulticastManager is responsible for the creation and storage of application-layer multicast trees used to
- * broadcast records to multiple target vertices.
- * 
- */
-
-public final class MulticastManager implements ChannelLookupProtocol {
-
-	/**
-	 * The log object used to report errors and warnings.
-	 */
-	private static final Log LOG = LogFactory.getLog(JobManager.class);
-
-	/**
-	 * Indicates if the arrangement of nodes within the overlay-tree should be randomized or not. If set to false,
-	 * arrangement of the same set of receiver nodes is guaranteed to be the same
-	 */
-	private final boolean randomized;
-
-	/**
-	 * Indicates if the tree should be constructed with a given topology stored in a file.
-	 */
-	private final boolean useHardCodedTree;
-
-	/**
-	 * File containing the hard-coded tree topology, if desired should contain node names (e.g. hostnames) with
-	 * corresponding children per line.
-	 * For example, a line "vm1.local vm2.local vm3.local" would result in vm1.local connecting to vm2.local and
-	 * vm3.local as children no further checking for connectivity of the given topology is done!
-	 */
-	private final String hardCodedTreeFilePath;
-
-	/**
-	 * Indicates the desired branching of the generated multicast-tree. 0 means unicast transmisison, 1 sequential tree,
-	 * 2 binomial tree, 3+ clustered tree
-	 */
-	private final int treeBranching;
-
-	/**
-	 * Reference to the scheduler.
-	 */
-	private final AbstractScheduler scheduler;
-
-	/**
-	 * Map caching already computed multicast forwarding tables.
-	 */
-	private final Map<ChannelID, MulticastForwardingTable> cachedTrees = new HashMap<ChannelID, MulticastForwardingTable>();
-
-	/**
-	 * Constructs a new multicast manager.
-	 * 
-	 * @param scheduler
-	 *        reference to the scheduler
-	 */
-	public MulticastManager(final AbstractScheduler scheduler) {
-
-		this.scheduler = scheduler;
-
-		this.randomized = GlobalConfiguration.getBoolean("multicast.randomize", false);
-		this.treeBranching = GlobalConfiguration.getInteger("multicast.branching", 1);
-		this.useHardCodedTree = GlobalConfiguration.getBoolean("multicast.usehardcodedtree", false);
-		this.hardCodedTreeFilePath = GlobalConfiguration.getString("multicast.hardcodedtreefile", null);
-	}
-
-	/**
-	 * Retrieves all recipients of a data for the given <code>sourceChannelID</code>. Returns both local recipients as
-	 * well as next-hop remote instances within the multicast-tree.
-	 * 
-	 * @param caller
-	 *        the {@link InstanceConnectionInfo} object of the task manager which calls this method
-	 * @param jobID
-	 *        the ID of the job the channel ID belongs to
-	 * @param sourceChannelID
-	 *        the ID of the channel to resolve
-	 * @return the lookup response containing the connection info and a return code
-	 */
-	public synchronized ConnectionInfoLookupResponse lookupConnectionInfo(final InstanceConnectionInfo caller,
-			final JobID jobID, final ChannelID sourceChannelID) {
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Receiving multicast receiver request from " + caller + " channel ID: " + sourceChannelID);
-		}
-
-		// Check if the tree is already created and cached
-		if (this.cachedTrees.containsKey(sourceChannelID)) {
-
-			LOG.info("Replying with cached entry...");
-			return cachedTrees.get(sourceChannelID).getConnectionInfo(caller);
-
-		} else {
-
-			// No tree exists, so we assume that this is the sending node initiating a multicast
-
-			if (!checkIfAllTargetVerticesReady(caller, jobID, sourceChannelID)) {
-				LOG.info("Received multicast request but not all receivers ready.");
-
-				return ConnectionInfoLookupResponse.createReceiverNotReady();
-			}
-
-			// Receivers are up and running.. extract tree nodes...
-			LinkedList<TreeNode> treeNodes = extractTreeNodes(caller, jobID, sourceChannelID, this.randomized);
-
-			// Do we want to use a hard-coded tree topology?
-			if (this.useHardCodedTree) {
-				LOG.info("Creating a hard-coded tree topology from file: " + hardCodedTreeFilePath);
-				cachedTrees.put(sourceChannelID, createHardCodedTree(treeNodes));
-				return cachedTrees.get(sourceChannelID).getConnectionInfo(caller);
-			}
-
-			// Otherwise we create a default tree and put it into the tree-cache
-			cachedTrees.put(sourceChannelID, createDefaultTree(treeNodes, this.treeBranching));
-			return cachedTrees.get(sourceChannelID).getConnectionInfo(caller);
-
-		}
-
-	}
-
-	/**
-	 * Returns and removes the TreeNode which is closest to the given indicator.
-	 * 
-	 * @param indicator
-	 * @param nodes
-	 * @return
-	 */
-	private TreeNode pollClosestNode(final TreeNode indicator, final LinkedList<TreeNode> nodes) {
-
-		TreeNode closestnode = getClosestNode(indicator, nodes);
-
-		nodes.remove(closestnode);
-
-		return closestnode;
-
-	}
-
-	/**
-	 * Returns the TreeNode which is closest to the given indicator Node. Proximity is determined
-	 * either using topology-information (if given), penalty information (if given) or it returns
-	 * the first node in the list.
-	 * 
-	 * @param indicator
-	 * @param nodes
-	 * @return
-	 */
-	private TreeNode getClosestNode(final TreeNode indicator, final LinkedList<TreeNode> nodes) {
-
-		if (indicator == null) {
-			return nodes.getFirst();
-		}
-
-		TreeNode closestNode = null;
-		for (TreeNode n : nodes) {
-			if (closestNode == null || n.getDistance(indicator) < closestNode.getDistance(indicator)) {
-				closestNode = n;
-			}
-		}
-
-		return closestNode;
-	}
-
-	/**
-	 * This method creates a tree with an arbitrary fan out (two means binary tree).
-	 * If topology information or penalties are available, it considers that.
-	 * If fanout is set to 1, it creates a sequential tree.
-	 * if fanout is set to Integer.MAXVALUE, it creates a unicast tree.
-	 * 
-	 * @param nodes
-	 * @param fanout
-	 * @return
-	 */
-	private MulticastForwardingTable createDefaultTree(LinkedList<TreeNode> nodes, int fanout) {
-
-		// Store nodes that already have a parent, but no children
-		LinkedList<TreeNode> connectedNodes = new LinkedList<TreeNode>();
-
-		final TreeNode rootnode = nodes.pollFirst();
-		TreeNode actualnode = rootnode;
-
-		while (nodes.size() > 0) { // We still have unconnected nodes...
-
-			for (int i = 0; i < fanout; i++) {
-
-				if (nodes.size() > 0) {
-					// pick the closest one and attach to actualnode
-					TreeNode child = pollClosestNode(actualnode, nodes);
-					actualnode.addChild(child);
-
-					// The child is now connected and can be used as forwarder in the next iteration..
-					connectedNodes.add(child);
-				} else {
-					break;
-				}
-			}
-
-			// OK.. take the next node to attach children to it..
-			// TODO: Optimization? "pollBest()" ?
-			actualnode = connectedNodes.pollFirst();
-
-		}
-		LOG.info("created multicast tree with following topology:\n" + rootnode.printTree());
-
-		return rootnode.createForwardingTable();
-
-	}
-
-	/**
-	 * Reads a hard-coded tree topology from file and creates a tree according to the hard-coded
-	 * topology from the file.
-	 * 
-	 * @param nodes
-	 * @return
-	 */
-	private MulticastForwardingTable createHardCodedTree(LinkedList<TreeNode> nodes) {
-		try {
-			FileInputStream fstream = new FileInputStream(this.hardCodedTreeFilePath);
-			DataInputStream in = new DataInputStream(fstream);
-			BufferedReader br = new BufferedReader(new InputStreamReader(in));
-			String strLine;
-			while ((strLine = br.readLine()) != null) {
-				String[] values = strLine.split(" ");
-				String actualhostname = values[0];
-				for (TreeNode n : nodes) {
-					if (n.toString().equals(actualhostname)) {
-						// we found the node.. connect the children
-						for (int i = 1; i < values.length; i++) {
-							for (TreeNode childnode : nodes) {
-								if (childnode.toString().equals(values[i])) {
-									n.addChild(childnode);
-								}
-							}
-						}
-					}
-				}
-			}
-			br.close();
-			// First node is root.. create tree. easy
-			return nodes.getFirst().createForwardingTable();
-
-		} catch (Exception e) {
-			System.out.println("Error reading hard-coded topology file for multicast tree: " + e.getMessage());
-			return null;
-		}
-	}
-
-	/**
-	 * Checks, if all target vertices for multicast transmisison are ready. If vertices are in state ASSIGNED, it will
-	 * deploy those vertices.
-	 * 
-	 * @param caller
-	 * @param jobID
-	 * @param sourceChannelID
-	 * @return
-	 */
-	private boolean checkIfAllTargetVerticesReady(InstanceConnectionInfo caller, JobID jobID, ChannelID sourceChannelID) {
-
-		final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
-
-		final ExecutionEdge outputChannel = eg.getEdgeByID(sourceChannelID);
-
-		final ExecutionGate broadcastGate = outputChannel.getOutputGate();
-
-		List<ExecutionVertex> verticesToDeploy = null;
-
-		// get all broadcast output channels
-		final int numberOfOutputChannels = broadcastGate.getNumberOfEdges();
-		for (int i = 0; i < numberOfOutputChannels; ++i) {
-
-			final ExecutionEdge c = broadcastGate.getEdge(i);
-
-			if (c.isBroadcast()) {
-
-				final ExecutionVertex targetVertex = c.getInputGate().getVertex();
-
-				if (targetVertex.getExecutionState() == ExecutionState.ASSIGNED) {
-					if (verticesToDeploy == null) {
-						verticesToDeploy = new ArrayList<ExecutionVertex>();
-					}
-					verticesToDeploy.add(targetVertex);
-				} else {
-
-					if (targetVertex.getExecutionState() != ExecutionState.RUNNING
-						&& targetVertex.getExecutionState() != ExecutionState.FINISHING) {
-						return false;
-					}
-				}
-			}
-		}
-
-		if (verticesToDeploy != null) {
-			this.scheduler.deployAssignedVertices(verticesToDeploy);
-			return false;
-		}
-
-		return true;
-	}
-
-	/**
-	 * Returns a list of (physical) Nodes (=hosts) within the multicast tree. Each node contains the local ChannelIDs,
-	 * records
-	 * must be forwarded to. The first node in the List is the only multicast sender.
-	 * 
-	 * @param sourceChannelID
-	 * @return
-	 */
-	private LinkedList<TreeNode> extractTreeNodes(final InstanceConnectionInfo source, final JobID jobID,
-			final ChannelID sourceChannelID, final boolean randomize) {
-
-		final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
-
-		final ExecutionEdge outputChannel = eg.getEdgeByID(sourceChannelID);
-
-		final ExecutionGate broadcastGate = outputChannel.getOutputGate();
-
-		final LinkedList<ExecutionEdge> outputChannels = new LinkedList<ExecutionEdge>();
-
-		// Get all broadcast output channels
-		final int numberOfOutputChannels = broadcastGate.getNumberOfEdges();
-		for (int i = 0; i < numberOfOutputChannels; ++i) {
-			final ExecutionEdge c = broadcastGate.getEdge(i);
-
-			if (c.isBroadcast()) {
-				outputChannels.add(c);
-			}
-		}
-
-		final LinkedList<TreeNode> treeNodes = new LinkedList<TreeNode>();
-
-		LinkedList<ChannelID> actualLocalTargets = new LinkedList<ChannelID>();
-
-		int firstConnectionID = 0;
-		// search for local targets for the tree node
-		for (Iterator<ExecutionEdge> iter = outputChannels.iterator(); iter.hasNext();) {
-
-			final ExecutionEdge actualOutputChannel = iter.next();
-
-			// the connection ID should not be needed for the root node (as it is not set as remote receiver)
-			// but in order to maintain consistency, it also gets the connectionID of the first channel pointing to it
-			firstConnectionID = actualOutputChannel.getConnectionID();
-
-			final ExecutionVertex targetVertex = actualOutputChannel.getInputGate().getVertex();
-
-			// is the target vertex running on the same instance?
-			if (targetVertex.getAllocatedResource().getInstance().getInstanceConnectionInfo().equals(source)) {
-
-				actualLocalTargets.add(actualOutputChannel.getInputChannelID());
-				iter.remove();
-			}
-
-		}
-
-		// create sender node (root) with source instance
-		TreeNode actualNode = new TreeNode(eg.getVertexByChannelID(sourceChannelID).getAllocatedResource()
-			.getInstance(), source, firstConnectionID, actualLocalTargets);
-
-		treeNodes.add(actualNode);
-
-		// now we have the root-node.. lets extract all other nodes
-
-		LinkedList<TreeNode> receiverNodes = new LinkedList<TreeNode>();
-
-		while (outputChannels.size() > 0) {
-
-			final ExecutionEdge firstChannel = outputChannels.pollFirst();
-
-			// each receiver nodes' endpoint is associated with the connection ID
-			// of the first channel pointing to this node.
-			final int connectionID = firstChannel.getConnectionID();
-
-			final ExecutionVertex firstTarget = firstChannel.getInputGate().getVertex();
-
-			final InstanceConnectionInfo actualInstance = firstTarget.getAllocatedResource().getInstance()
-				.getInstanceConnectionInfo();
-
-			actualLocalTargets = new LinkedList<ChannelID>();
-
-			// add first local target
-			actualLocalTargets.add(firstChannel.getInputChannelID());
-
-			// now we iterate through the remaining channels to find other local targets...
-			for (Iterator<ExecutionEdge> iter = outputChannels.iterator(); iter.hasNext();) {
-
-				final ExecutionEdge actualOutputChannel = iter.next();
-
-				final ExecutionVertex actualTarget = actualOutputChannel.getInputGate().getVertex();
-
-				// is the target vertex running on the same instance?
-				if (actualTarget.getAllocatedResource().getInstance().getInstanceConnectionInfo()
-					.equals(actualInstance)) {
-					actualLocalTargets.add(actualOutputChannel.getInputChannelID());
-
-					iter.remove();
-
-				}
-
-			}// end for
-
-			// create tree node for current instance
-			actualNode = new TreeNode(firstTarget.getAllocatedResource().getInstance(), actualInstance, connectionID,
-				actualLocalTargets);
-
-			receiverNodes.add(actualNode);
-
-		}// end while
-
-		// Do we want to shuffle the receiver nodes?
-		// Only randomize the receivers, as the sender (the first one) has to stay the same
-		if (randomize) {
-			Collections.shuffle(receiverNodes);
-		} else {
-			// Sort Tree Nodes according to host name..
-			Collections.sort(receiverNodes);
-		}
-
-		treeNodes.addAll(receiverNodes);
-
-		return treeNodes;
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TopologyInformationSupplier.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TopologyInformationSupplier.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TopologyInformationSupplier.java
deleted file mode 100644
index 0e80ba2..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TopologyInformationSupplier.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.multicast;
-
-public class TopologyInformationSupplier {
-	
-	
-	/**
-	 * Returns a float representing the network distance between two nodes.
-	 * @param node1
-	 * @param node2
-	 * @return
-	 */
-	public float getNetworkDistance(String node1, String node2){
-		//TODO: Implement me
-		return 0;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TreeNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TreeNode.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TreeNode.java
deleted file mode 100644
index 12ee998..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TreeNode.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.multicast;
-
-import java.net.InetSocketAddress;
-import java.util.LinkedList;
-
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.ConnectionInfoLookupResponse;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.RemoteReceiver;
-
-/**
- * Each physical node (instance) within a multicast tree is represented by a TreeNode object.
- * It contains the connection info for the certain node and a list of the local output channels.
- * 
- */
-
-public class TreeNode implements Comparable<TreeNode> {
-
-	private TreeNode parentnode = null;
-
-	private final AbstractInstance instance;
-
-	private final InstanceConnectionInfo nodeConnectionInfo;
-
-	private final int connectionID;
-
-	private final LinkedList<ChannelID> localTargets;
-
-	private final LinkedList<TreeNode> children = new LinkedList<TreeNode>();
-
-	private final LinkedList<IntegerProperty> properties = new LinkedList<TreeNode.IntegerProperty>();
-
-	private int penalty = 0;
-
-	public TreeNode(AbstractInstance instance, InstanceConnectionInfo nodeConnectionInfo, int connectionID,
-			LinkedList<ChannelID> localTargets) {
-		this.instance = instance;
-		this.connectionID = connectionID;
-		this.nodeConnectionInfo = nodeConnectionInfo;
-		this.localTargets = localTargets;
-	}
-
-	public void setProperty(String key, int value) {
-		boolean exists = false;
-		for (IntegerProperty property : this.properties) {
-			if (property.getKey().equals(key)) {
-				property.setValue(value);
-				exists = true;
-				break;
-			}
-		}
-		if (!exists) {
-			this.properties.add(new IntegerProperty(key, value));
-		}
-	}
-
-	public int getProperty(String key) {
-		for (IntegerProperty property : this.properties) {
-			if (property.getKey().equals(key)) {
-				return property.getValue();
-			}
-		}
-		return -1;
-	}
-
-	public void removeChild(TreeNode child) {
-		if (this.children.contains(child)) {
-			child.setParent(null);
-			this.children.remove(child);
-		}
-	}
-
-	public void addChild(TreeNode child) {
-		this.children.add(child);
-		child.setParent(this);
-	}
-
-	public LinkedList<TreeNode> getChildren() {
-		return this.children;
-	}
-
-	public TreeNode getParent() {
-		return this.parentnode;
-	}
-
-	private InstanceConnectionInfo getConnectionInfo() {
-		return this.nodeConnectionInfo;
-	}
-
-	private int getConnectionID() {
-		return this.connectionID;
-	}
-
-	private void setParent(TreeNode parent) {
-		this.parentnode = parent;
-	}
-
-	@Override
-	public int compareTo(TreeNode o) {
-		return this.nodeConnectionInfo.compareTo(o.nodeConnectionInfo);
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (o instanceof TreeNode) {
-			return this.nodeConnectionInfo.equals(((TreeNode) o).nodeConnectionInfo);
-		} else {
-			return false;
-		}
-	}
-
-	public int getDistance(TreeNode o) {
-		return this.instance.getDistance(o.instance);
-	}
-
-	public String toString() {
-		return this.nodeConnectionInfo.toString();
-	}
-
-	public int getPenalty() {
-		return this.penalty;
-	}
-
-	public void setPenalty(int penalty) {
-		this.penalty = penalty;
-	}
-
-	/**
-	 * This method should be called on the root node (sender node).
-	 * It traverses the Tree and returns a full forwarding table
-	 * including all local and remote receivers.
-	 * 
-	 * @return
-	 */
-	public MulticastForwardingTable createForwardingTable() {
-		MulticastForwardingTable table = new MulticastForwardingTable();
-		this.generateRecursiveForwardingTable(table);
-		return table;
-	}
-
-	/**
-	 * Private recursive method to generate forwarding table
-	 * 
-	 * @param table
-	 */
-	private void generateRecursiveForwardingTable(MulticastForwardingTable table) {
-
-		final ConnectionInfoLookupResponse lookupResponse = ConnectionInfoLookupResponse.createReceiverFoundAndReady();
-
-		// add local targets
-		for (final ChannelID i : this.localTargets) {
-			lookupResponse.addLocalTarget(i);
-		}
-
-		// add remote targets
-		for (final TreeNode n : this.children) {
-
-			// Instance Connection info associated with the remote target
-			final InstanceConnectionInfo ici = n.getConnectionInfo();
-
-			// get the connection ID associated with the remote target endpoint
-			final int icid = n.getConnectionID();
-
-			final InetSocketAddress isa = new InetSocketAddress(ici.getAddress(), ici.getDataPort());
-
-			lookupResponse.addRemoteTarget(new RemoteReceiver(isa, icid));
-		}
-
-		table.addConnectionInfo(this.nodeConnectionInfo, lookupResponse);
-
-		for (final TreeNode n : this.children) {
-			n.generateRecursiveForwardingTable(table);
-		}
-	}
-
-	/**
-	 * Prints the tree in a human readable format, starting with the actual node as root.
-	 * 
-	 * @return
-	 */
-	public String printTree() {
-
-		StringBuilder sb = new StringBuilder();
-		this.printRecursiveTree(sb);
-		return sb.toString();
-	}
-
-	private void printRecursiveTree(StringBuilder sb) {
-
-		if (this.children.size() > 0) {
-			sb.append("STRUCT ");
-
-			sb.append(this.nodeConnectionInfo);
-
-			for (TreeNode n : this.children) {
-				sb.append(" ");
-				sb.append(n.getConnectionInfo().toString());
-			}
-
-			sb.append("\n");
-
-			for (TreeNode n : this.children) {
-				n.printRecursiveTree(sb);
-			}
-		}
-	}
-
-	private static class IntegerProperty {
-
-		private String key = null;
-
-		private int value = 0;
-
-		public IntegerProperty(final String key, final int value) {
-			this.key = key;
-			this.value = value;
-		}
-
-		public String getKey() {
-			return this.key;
-		}
-
-		public int getValue() {
-			return this.value;
-		}
-
-		public void setValue(final int value) {
-			this.value = value;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/TaskManagerProfiler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/TaskManagerProfiler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/TaskManagerProfiler.java
index b1d6ff6..302192b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/TaskManagerProfiler.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/TaskManagerProfiler.java
@@ -15,7 +15,7 @@ package eu.stratosphere.nephele.profiling;
 
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.taskmanager.runtime.RuntimeTask;
+import eu.stratosphere.nephele.taskmanager.Task;
 
 /**
  * This interface must be implemented by profiling components
@@ -32,8 +32,7 @@ public interface TaskManagerProfiler {
 	 * @param jobConfiguration
 	 *        the job configuration sent with the task
 	 */
-	void registerExecutionListener(RuntimeTask task, Configuration jobConfiguration);
-
+	void registerExecutionListener(Task task, Configuration jobConfiguration);
 
 	/**
 	 * Unregisters all previously register {@link ExecutionListener} objects for

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/TaskManagerProfilerImpl.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/TaskManagerProfilerImpl.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/TaskManagerProfilerImpl.java
index 49fa8d1..58f0f38 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/TaskManagerProfilerImpl.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/TaskManagerProfilerImpl.java
@@ -13,20 +13,6 @@
 
 package eu.stratosphere.nephele.profiling.impl;
 
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadMXBean;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.configuration.GlobalConfiguration;
 import eu.stratosphere.nephele.execution.Environment;
@@ -40,8 +26,21 @@ import eu.stratosphere.nephele.profiling.TaskManagerProfiler;
 import eu.stratosphere.nephele.profiling.impl.types.InternalExecutionVertexThreadProfilingData;
 import eu.stratosphere.nephele.profiling.impl.types.InternalInstanceProfilingData;
 import eu.stratosphere.nephele.profiling.impl.types.ProfilingDataContainer;
-import eu.stratosphere.nephele.taskmanager.runtime.RuntimeTask;
+import eu.stratosphere.nephele.taskmanager.Task;
 import eu.stratosphere.util.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 public class TaskManagerProfilerImpl extends TimerTask implements TaskManagerProfiler {
 
@@ -99,7 +98,7 @@ public class TaskManagerProfilerImpl extends TimerTask implements TaskManagerPro
 
 
 	@Override
-	public void registerExecutionListener(final RuntimeTask task, final Configuration jobConfiguration) {
+	public void registerExecutionListener(final Task task, final Configuration jobConfiguration) {
 
 		// Register profiling hook for the environment
 		task.registerExecutionListener(new EnvironmentListenerImpl(this, task.getRuntimeEnvironment()));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ChannelLookupProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ChannelLookupProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ChannelLookupProtocol.java
index 3f1d669..8fdaf55 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ChannelLookupProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ChannelLookupProtocol.java
@@ -17,9 +17,9 @@ import java.io.IOException;
 
 import eu.stratosphere.core.protocols.VersionedProtocol;
 import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
 import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.ConnectionInfoLookupResponse;
+import eu.stratosphere.runtime.io.network.ConnectionInfoLookupResponse;
 
 /**
  * The channel lookup protocol can be used to resolve the ID of an output channel to all recipients which shall receive

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
index a2578a5..461c797 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
@@ -24,7 +24,6 @@ import eu.stratosphere.nephele.instance.InstanceType;
 import eu.stratosphere.nephele.instance.InstanceTypeDescription;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.managementgraph.ManagementGraph;
-import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
 import eu.stratosphere.nephele.topology.NetworkTopology;
 
 /**
@@ -82,18 +81,6 @@ public interface ExtendedManagementProtocol extends JobManagementProtocol {
 	List<AbstractEvent> getEvents(JobID jobID) throws IOException;
 
 	/**
-	 * Kills the task with the given vertex ID.
-	 * 
-	 * @param jobID
-	 *        the ID of the job the vertex to be killed belongs to
-	 * @param id
-	 *        the vertex ID which identified the task be killed
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the kill request
-	 */
-	void killTask(JobID jobID, ManagementVertexID id) throws IOException;
-
-	/**
 	 * Kills the instance with the given name (i.e. shuts down its task manager).
 	 * 
 	 * @param instanceName

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
index 63509ab..19522db 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
@@ -23,9 +23,8 @@ import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileRequest
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
 import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
-import eu.stratosphere.nephele.taskmanager.TaskKillResult;
 import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
 
 /**
@@ -59,17 +58,6 @@ public interface TaskOperationProtocol extends VersionedProtocol {
 	TaskCancelResult cancelTask(ExecutionVertexID id) throws IOException;
 
 	/**
-	 * Advises the task manager to kill the task with the given ID.
-	 * 
-	 * @param id
-	 *        the ID of the task to kill
-	 * @return the result of the task kill attempt
-	 * @throws IOException
-	 *         thrown if an error occurs during this remote procedure call
-	 */
-	TaskKillResult killTask(ExecutionVertexID id) throws IOException;
-
-	/**
 	 * Queries the task manager about the cache status of the libraries stated in the {@link LibraryCacheProfileRequest}
 	 * object.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/BlockChannelAccess.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/BlockChannelAccess.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/BlockChannelAccess.java
index be24922..8fdaa5d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/BlockChannelAccess.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/BlockChannelAccess.java
@@ -134,9 +134,7 @@ public abstract class BlockChannelAccess<R extends IORequest, C extends Collecti
 						this.closeLock.wait(1000);
 						checkErroneous();
 					}
-					catch (InterruptedException iex) {
-						throw new IOException("Block channel access was interrupted while closing.");
-					}
+					catch (InterruptedException iex) {}
 				}
 			}
 			finally {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
index 7680843..a8fe096 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
@@ -56,8 +56,7 @@ public interface MemoryManager {
 	
 	/**
 	 * Releases all memory segments for the given task. 
-	 * 
-	 * @param <T> The type of memory segment.
+	 *
 	 * @param task The task whose memory segments are to be released.
 	 */
 	void releaseAll(AbstractInvokable task);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
index 5a74bab..8bc7b13 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
@@ -13,7 +13,6 @@
 
 package eu.stratosphere.nephele.services.memorymanager.spi;
 
-
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -125,7 +124,6 @@ public class DefaultMemoryManager implements MemoryManager {
 		}
 	}
 
-
 	@Override
 	public void shutdown() {
 		// -------------------- BEGIN CRITICAL SECTION -------------------
@@ -150,7 +148,6 @@ public class DefaultMemoryManager implements MemoryManager {
 		}
 		// -------------------- END CRITICAL SECTION -------------------
 	}
-	
 
 	public boolean verifyEmpty() {
 		synchronized (this.lock) {
@@ -161,7 +158,7 @@ public class DefaultMemoryManager implements MemoryManager {
 	// ------------------------------------------------------------------------
 	//                 MemoryManager interface implementation
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public List<MemorySegment> allocatePages(AbstractInvokable owner, int numPages) throws MemoryAllocationException {
 		final ArrayList<MemorySegment> segs = new ArrayList<MemorySegment>(numPages);
@@ -212,7 +209,6 @@ public class DefaultMemoryManager implements MemoryManager {
 	}
 	
 	// ------------------------------------------------------------------------
-	
 
 	@Override
 	public void release(MemorySegment segment) {
@@ -254,7 +250,6 @@ public class DefaultMemoryManager implements MemoryManager {
 		// -------------------- END CRITICAL SECTION -------------------
 	}
 
-
 	@Override
 	public <T extends MemorySegment> void release(Collection<T> segments) {
 		
@@ -317,7 +312,6 @@ public class DefaultMemoryManager implements MemoryManager {
 		// -------------------- END CRITICAL SECTION -------------------
 	}
 
-
 	@Override
 	public void releaseAll(AbstractInvokable owner) {
 		// -------------------- BEGIN CRITICAL SECTION -------------------
@@ -326,28 +320,27 @@ public class DefaultMemoryManager implements MemoryManager {
 			if (this.isShutDown) {
 				throw new IllegalStateException("Memory manager has been shut down.");
 			}
-			
+
 			// get all segments
 			final Set<DefaultMemorySegment> segments = this.allocatedSegments.remove(owner);
-			
+
 			// all segments may have been freed previously individually
 			if (segments == null || segments.isEmpty()) {
 				return;
 			}
-			
+
 			// free each segment
 			for (DefaultMemorySegment seg : segments) {
 				final byte[] buffer = seg.destroy();
 				this.freeSegments.add(buffer);
 			}
-			
+
 			segments.clear();
 		}
 		// -------------------- END CRITICAL SECTION -------------------
 	}
 	
 	// ------------------------------------------------------------------------
-	
 
 	@Override
 	public int getPageSize() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/ExecutorThreadFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/ExecutorThreadFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/ExecutorThreadFactory.java
new file mode 100644
index 0000000..bfad346
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/ExecutorThreadFactory.java
@@ -0,0 +1,35 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+package eu.stratosphere.nephele.taskmanager;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ExecutorThreadFactory implements ThreadFactory {
+	
+	public static final ExecutorThreadFactory INSTANCE = new ExecutorThreadFactory();
+
+	private static final String THREAD_NAME = "Nephele Executor Thread ";
+	
+	private final AtomicInteger threadNumber = new AtomicInteger(1);
+	
+	
+	private ExecutorThreadFactory() {}
+	
+	
+	public Thread newThread(Runnable target) {
+		Thread t = new Thread(target, THREAD_NAME + threadNumber.getAndIncrement());
+		t.setDaemon(true);
+		return t;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
index df9bbde..06eec0c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
@@ -15,42 +15,92 @@ package eu.stratosphere.nephele.taskmanager;
 
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.nephele.execution.Environment;
+import eu.stratosphere.nephele.execution.ExecutionListener;
+import eu.stratosphere.nephele.execution.ExecutionObserver;
 import eu.stratosphere.nephele.execution.ExecutionState;
+import eu.stratosphere.nephele.execution.ExecutionStateTransition;
+import eu.stratosphere.nephele.execution.RuntimeEnvironment;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.profiling.TaskManagerProfiler;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPoolOwner;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.TaskContext;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeDispatcher;
+import eu.stratosphere.nephele.template.AbstractInvokable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public final class Task implements ExecutionObserver {
+
+	/**
+	 * The log object used for debugging.
+	 */
+	private static final Log LOG = LogFactory.getLog(Task.class);
+
+	private final ExecutionVertexID vertexID;
+
+	private final RuntimeEnvironment environment;
+
+	private final TaskManager taskManager;
+
+	/**
+	 * Stores whether the task has been canceled.
+	 */
+	private volatile boolean isCanceled = false;
+
+	/**
+	 * The current execution state of the task
+	 */
+	private volatile ExecutionState executionState = ExecutionState.STARTING;
+
+	private Queue<ExecutionListener> registeredListeners = new ConcurrentLinkedQueue<ExecutionListener>();
+
+	public Task(final ExecutionVertexID vertexID, final RuntimeEnvironment environment,
+					   final TaskManager taskManager) {
+
+		this.vertexID = vertexID;
+		this.environment = environment;
+		this.taskManager = taskManager;
+
+		this.environment.setExecutionObserver(this);
+	}
 
-public interface Task {
 
 	/**
 	 * Returns the ID of the job this task belongs to.
 	 * 
 	 * @return the ID of the job this task belongs to
 	 */
-	JobID getJobID();
+	public JobID getJobID() {
+		return this.environment.getJobID();
+	}
 
 	/**
 	 * Returns the ID of this task.
 	 * 
 	 * @return the ID of this task
 	 */
-	ExecutionVertexID getVertexID();
+	public ExecutionVertexID getVertexID() {
+		return this.vertexID;
+	}
 
 	/**
 	 * Returns the environment associated with this task.
 	 * 
 	 * @return the environment associated with this task
 	 */
-	Environment getEnvironment();
+	public Environment getEnvironment() {
+		return this.environment;
+	}
 
 	/**
 	 * Marks the task as failed and triggers the appropriate state changes.
 	 */
-	void markAsFailed();
+	public void markAsFailed() {
+		executionStateChanged(ExecutionState.FAILED, "Execution thread died unexpectedly");
+	}
 
 	/**
 	 * Checks if the state of the thread which is associated with this task is <code>TERMINATED</code>.
@@ -58,22 +108,72 @@ public interface Task {
 	 * @return <code>true</code> if the state of this thread which is associated with this task is
 	 *         <code>TERMINATED</code>, <code>false</code> otherwise
 	 */
-	boolean isTerminated();
+	public boolean isTerminated() {
+		final Thread executingThread = this.environment.getExecutingThread();
+		if (executingThread.getState() == Thread.State.TERMINATED) {
+			return true;
+		}
+
+		return false;
+	}
 
 	/**
 	 * Starts the execution of this task.
 	 */
-	void startExecution();
+	public void startExecution() {
+
+		final Thread thread = this.environment.getExecutingThread();
+		thread.start();
+	}
 
 	/**
 	 * Cancels the execution of the task (i.e. interrupts the execution thread).
 	 */
-	void cancelExecution();
+	public void cancelExecution() {
+		final Thread executingThread = this.environment.getExecutingThread();
+
+		if (executingThread == null) {
+			return;
+		}
+
+		LOG.info("Canceling " + this.environment.getTaskNameWithIndex());
+
+		this.isCanceled = true;
+		// Change state
+		executionStateChanged(ExecutionState.CANCELING, null);
+
+		// Request user code to shut down
+		try {
+			final AbstractInvokable invokable = this.environment.getInvokable();
+			if (invokable != null) {
+				invokable.cancel();
+			}
+		} catch (Throwable e) {
+			LOG.error("Error while canceling task", e);
+		}
+
+		// Continuously interrupt the user thread until it changed to state CANCELED
+		while (true) {
+			executingThread.interrupt();
+
+			if (!executingThread.isAlive()) {
+				break;
+			}
+
+			try {
+				executingThread.join(1000);
+			} catch (InterruptedException e) {}
+
+			if (!executingThread.isAlive()) {
+				break;
+			}
+
+			if (LOG.isDebugEnabled())
+				LOG.debug("Sending repeated canceling  signal to " +
+						this.environment.getTaskName() + " with state " + this.executionState);
+		}
+	}
 
-	/**
-	 * Kills the task (i.e. interrupts the execution thread).
-	 */
-	void killExecution();
 
 	/**
 	 * Registers the task manager profiler with the task.
@@ -83,7 +183,9 @@ public interface Task {
 	 * @param jobConfiguration
 	 *        the configuration attached to the job
 	 */
-	void registerProfiler(TaskManagerProfiler taskManagerProfiler, Configuration jobConfiguration);
+	public void registerProfiler(final TaskManagerProfiler taskManagerProfiler, final Configuration jobConfiguration) {
+		taskManagerProfiler.registerExecutionListener(this, jobConfiguration);
+	}
 
 	/**
 	 * Unregisters the task from the central memory manager.
@@ -91,7 +193,11 @@ public interface Task {
 	 * @param memoryManager
 	 *        the central memory manager
 	 */
-	void unregisterMemoryManager(MemoryManager memoryManager);
+	public void unregisterMemoryManager(final MemoryManager memoryManager) {
+		if (memoryManager != null) {
+			memoryManager.releaseAll(this.environment.getInvokable());
+		}
+	}
 
 	/**
 	 * Unregisters the task from the task manager profiler.
@@ -99,15 +205,124 @@ public interface Task {
 	 * @param taskManagerProfiler
 	 *        the task manager profiler
 	 */
-	void unregisterProfiler(TaskManagerProfiler taskManagerProfiler);
+	public void unregisterProfiler(final TaskManagerProfiler taskManagerProfiler) {
+		if (taskManagerProfiler != null) {
+			taskManagerProfiler.unregisterExecutionListener(this.vertexID);
+		}
+	}
 
 	/**
 	 * Returns the current execution state of the task.
 	 * 
 	 * @return the current execution state of the task
 	 */
-	ExecutionState getExecutionState();
+	public ExecutionState getExecutionState() {
+		return this.executionState;
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	//                                        ExecutionObserver methods
+	// -----------------------------------------------------------------------------------------------------------------
+	@Override
+	public void executionStateChanged(final ExecutionState newExecutionState, final String optionalMessage) {
+
+		// Check the state transition
+		ExecutionStateTransition.checkTransition(false, getTaskName(), this.executionState, newExecutionState);
+
+		// Make sure the reason for a transition to FAILED appears in the log files
+		if (newExecutionState == ExecutionState.FAILED) {
+			LOG.error(optionalMessage);
+		}
+
+		// Notify all listener objects
+		final Iterator<ExecutionListener> it = this.registeredListeners.iterator();
+		while (it.hasNext()) {
+			it.next().executionStateChanged(this.environment.getJobID(), this.vertexID, newExecutionState,
+					optionalMessage);
+		}
+
+		// Store the new execution state
+		this.executionState = newExecutionState;
+
+		// Finally propagate the state change to the job manager
+		this.taskManager.executionStateChanged(this.environment.getJobID(), this.vertexID, newExecutionState,
+				optionalMessage);
+	}
+
+	/**
+	 * Returns the name of the task associated with this observer object.
+	 *
+	 * @return the name of the task associated with this observer object
+	 */
+	private String getTaskName() {
+
+		return this.environment.getTaskName() + " (" + (this.environment.getIndexInSubtaskGroup() + 1) + "/"
+				+ this.environment.getCurrentNumberOfSubtasks() + ")";
+	}
+
+
+	@Override
+	public void userThreadStarted(final Thread userThread) {
+
+		// Notify the listeners
+		final Iterator<ExecutionListener> it = this.registeredListeners.iterator();
+		while (it.hasNext()) {
+			it.next().userThreadStarted(this.environment.getJobID(), this.vertexID, userThread);
+		}
+	}
+
+
+	@Override
+	public void userThreadFinished(final Thread userThread) {
+
+		// Notify the listeners
+		final Iterator<ExecutionListener> it = this.registeredListeners.iterator();
+		while (it.hasNext()) {
+			it.next().userThreadFinished(this.environment.getJobID(), this.vertexID, userThread);
+		}
+	}
+
+	/**
+	 * Registers the {@link ExecutionListener} object for this task. This object
+	 * will be notified about important events during the task execution.
+	 *
+	 * @param executionListener
+	 *        the object to be notified for important events during the task execution
+	 */
+
+	public void registerExecutionListener(final ExecutionListener executionListener) {
+
+		this.registeredListeners.add(executionListener);
+	}
+
+	/**
+	 * Unregisters the {@link ExecutionListener} object for this environment. This object
+	 * will no longer be notified about important events during the task execution.
+	 *
+	 * @param executionListener
+	 *        the lister object to be unregistered
+	 */
+
+	public void unregisterExecutionListener(final ExecutionListener executionListener) {
+
+		this.registeredListeners.remove(executionListener);
+	}
+
+
+	@Override
+	public boolean isCanceled() {
+
+		return this.isCanceled;
+	}
+
+	/**
+	 * Returns the runtime environment associated with this task.
+	 *
+	 * @return the runtime environment associated with this task
+	 */
+	public RuntimeEnvironment getRuntimeEnvironment() {
+
+		return this.environment;
+	}
 
-	TaskContext createTaskContext(TransferEnvelopeDispatcher transferEnvelopeDispatcher,
-			LocalBufferPoolOwner previousBufferPoolOwner);
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java
deleted file mode 100644
index 3c0002c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager;
-
-import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-
-/**
- * A <code>TaskKillResult</code> is used to report the results
- * of a task kill attempt. It contains the ID of the task to be killed, a return code and
- * a description. In case of an error during the kill operation the description includes an error message.
- * 
- */
-public class TaskKillResult extends AbstractTaskResult {
-
-	/**
-	 * Constructs a new task kill result.
-	 * 
-	 * @param vertexID
-	 *        the task ID this result belongs to
-	 * @param returnCode
-	 *        the return code of the kill
-	 */
-	public TaskKillResult(final ExecutionVertexID vertexID, final ReturnCode returnCode) {
-		super(vertexID, returnCode);
-	}
-
-	/**
-	 * Constructs an empty task kill result.
-	 */
-	public TaskKillResult() {
-		super();
-	}
-}