You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:30 UTC

[06/63] [abbrv] Refactor job graph construction to incremental attachment based

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index f074f3c..67e4700 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -31,15 +31,12 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -62,29 +59,30 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.protocols.VersionedProtocol;
 import org.apache.flink.runtime.ExecutionMode;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.ExecutionState2;
 import org.apache.flink.runtime.execution.RuntimeEnvironment;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileRequest;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileResponse;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheUpdate;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.ChannelManager;
-import org.apache.flink.runtime.io.network.InsufficientResourcesException;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
 import org.apache.flink.runtime.io.network.NetworkConnectionManager;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.ipc.RPC;
 import org.apache.flink.runtime.ipc.Server;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.net.NetUtils;
@@ -95,10 +93,9 @@ import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
 import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
 import org.apache.flink.runtime.protocols.JobManagerProtocol;
 import org.apache.flink.runtime.protocols.TaskOperationProtocol;
-import org.apache.flink.runtime.types.IntegerRecord;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.runtime.util.SerializableArrayList;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.StringUtils;
 
 /**
@@ -106,7 +103,6 @@ import org.apache.flink.util.StringUtils;
  * (or in case of an execution error) it reports the execution result back to the job manager.
  * Task managers are able to automatically discover the job manager and receive its configuration from it
  * as long as the job manager is running on the same local network
- * 
  */
 public class TaskManager implements TaskOperationProtocol {
 
@@ -114,10 +110,6 @@ public class TaskManager implements TaskOperationProtocol {
 
 	private static final int STARTUP_FAILURE_RETURN_CODE = 1;
 	
-	private static final int CRITICAL_ERROR_RETURN_CODE = 2;
-	
-	private static final int IPC_HANDLER_COUNT = 1;
-	
 	private static final int MAX_LOST_HEART_BEATS = 3;
 	
 	private static final int DELAY_AFTER_LOST_CONNECTION = 10000;
@@ -127,42 +119,36 @@ public class TaskManager implements TaskOperationProtocol {
 	
 	// --------------------------------------------------------------------------------------------
 	
+	private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE);
+	
+	
 	private final InstanceConnectionInfo localInstanceConnectionInfo;
 	
 	private final HardwareDescription hardwareDescription;
 	
 	private final ExecutionMode executionMode;
 	
+	
 	private final JobManagerProtocol jobManager;
 
 	private final InputSplitProviderProtocol globalInputSplitProvider;
 
 	private final ChannelLookupProtocol lookupService;
-
-	private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE);
 	
 	private final AccumulatorProtocol accumulatorProtocolProxy;
 
+	
 	private final Server taskManagerServer;
 
 	private final FileCache fileCache = new FileCache();
 	
-	/**
-	 * This map contains all the tasks whose threads are in a state other than TERMINATED. If any task
-	 * is stored inside this map and its thread status is TERMINATED, this indicates a virtual machine error.
-	 * As a result, task status will switch to FAILED and reported to the {@link org.apache.flink.runtime.jobmanager.JobManager}.
-	 */
-	private final Map<ExecutionVertexID, Task> runningTasks = new ConcurrentHashMap<ExecutionVertexID, Task>();
+	/** All currently running tasks */
+	private final ConcurrentHashMap<ExecutionAttemptID, Task> runningTasks = new ConcurrentHashMap<ExecutionAttemptID, Task>();
 
-	/**
-	 * The instance of the {@link ChannelManager} which is responsible for
-	 * setting up and cleaning up the byte buffered channels of the tasks.
-	 */
+	/** The {@link ChannelManager} sets up and cleans up the data exchange channels of the tasks. */
 	private final ChannelManager channelManager;
 
-	/**
-	 * Instance of the task manager profile if profiling is enabled.
-	 */
+	/** Instance of the task manager profile if profiling is enabled. */
 	private final TaskManagerProfiler profiler;
 
 	private final MemoryManager memoryManager;
@@ -181,50 +167,39 @@ public class TaskManager implements TaskOperationProtocol {
 	private volatile boolean shutdownComplete;
 	
 	
-	/**
-	 * All parameters are obtained from the 
-	 * {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager.
-	 */
-	public TaskManager(ExecutionMode executionMode) throws Exception {
-		if (executionMode == null) {
-			throw new NullPointerException("Execution mode must not be null.");
+	// --------------------------------------------------------------------------------------------
+	//  Constructor & Shutdown
+	// --------------------------------------------------------------------------------------------
+	
+	public TaskManager(ExecutionMode executionMode, JobManagerProtocol jobManager, InputSplitProviderProtocol splitProvider, 
+			ChannelLookupProtocol channelLookup, AccumulatorProtocol accumulators,
+			InetSocketAddress jobManagerAddress, InetAddress taskManagerBindAddress)
+		throws Exception
+	{
+		if (executionMode == null || jobManager == null || splitProvider == null || channelLookup == null || accumulators == null) {
+			throw new NullPointerException();
 		}
 		
-		LOG.info("Execution mode: " + executionMode);
+		LOG.info("TaskManager execution mode: " + executionMode);
+		
 		this.executionMode = executionMode;
-
-		// IMPORTANT! At this point, the GlobalConfiguration must have been read!
+		this.jobManager = jobManager;
+		this.lookupService = channelLookup;
+		this.globalInputSplitProvider = splitProvider;
+		this.accumulatorProtocolProxy = accumulators;
 		
-		final InetSocketAddress jobManagerAddress;
+		// initialize the number of slots
 		{
-			LOG.info("Reading location of job manager from configuration");
-			
-			final String address = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-			final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
-			
-			if (address == null) {
-				throw new Exception("Job manager address not configured in the GlobalConfiguration.");
-			}
-	
-			// Try to convert configured address to {@link InetAddress}
-			try {
-				final InetAddress tmpAddress = InetAddress.getByName(address);
-				jobManagerAddress = new InetSocketAddress(tmpAddress, port);
-			}
-			catch (UnknownHostException e) {
-				LOG.fatal("Could not resolve JobManager host name.");
-				throw new Exception("Could not resolve JobManager host name: " + e.getMessage(), e);
+			int slots = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, -1);
+			if (slots == -1) {
+				slots = 1;
+				LOG.info("Number of task slots not configured. Creating one task slot.");
+			} else if (slots <= 0) {
+				throw new Exception("Illegal value for the number of task slots: " + slots);
+			} else {
+				LOG.info("Creating " + slots + " task slot(s).");
 			}
-			
-			LOG.info("Connecting to JobManager at: " + jobManagerAddress);
-		}
-		
-		// Create RPC connection to the JobManager
-		try {
-			this.jobManager = RPC.getProxy(JobManagerProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
-		} catch (IOException e) {
-			LOG.fatal("Could not connect to the JobManager: " + e.getMessage(), e);
-			throw new Exception("Failed to initialize connection to JobManager: " + e.getMessage(), e);
+			this.numberOfSlots = slots;
 		}
 		
 		int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, -1);
@@ -236,52 +211,18 @@ public class TaskManager implements TaskOperationProtocol {
 			dataPort = getAvailablePort();
 		}
 		
-		// Determine our own public facing address and start the server
-		{
-			final InetAddress taskManagerAddress;
-			try {
-				taskManagerAddress = getTaskManagerAddress(jobManagerAddress);
-			}
-			catch (Exception e) {
-				throw new RuntimeException("The TaskManager failed to connect to the JobManager.", e);
-			}
-			
-			this.localInstanceConnectionInfo = new InstanceConnectionInfo(taskManagerAddress, ipcPort, dataPort);
-			LOG.info("TaskManager connection information:" + this.localInstanceConnectionInfo);
-
-			// Start local RPC server
-			try {
-				this.taskManagerServer = RPC.getServer(this, taskManagerAddress.getHostAddress(), ipcPort, IPC_HANDLER_COUNT);
-				this.taskManagerServer.start();
-			} catch (IOException e) {
-				LOG.fatal("Failed to start TaskManager server. " + e.getMessage(), e);
-				throw new Exception("Failed to start taskmanager server. " + e.getMessage(), e);
-			}
-		}
-		
-		// Try to create local stub of the global input split provider
-		try {
-			this.globalInputSplitProvider = RPC.getProxy(InputSplitProviderProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
-		} catch (IOException e) {
-			LOG.fatal(e.getMessage(), e);
-			throw new Exception("Failed to initialize connection to global input split provider: " + e.getMessage(), e);
-		}
-
-		// Try to create local stub for the lookup service
-		try {
-			this.lookupService = RPC.getProxy(ChannelLookupProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
-		} catch (IOException e) {
-			LOG.fatal(e.getMessage(), e);
-			throw new Exception("Failed to initialize channel lookup protocol. " + e.getMessage(), e);
-		}
+		this.localInstanceConnectionInfo = new InstanceConnectionInfo(taskManagerBindAddress, ipcPort, dataPort);
+		LOG.info("TaskManager connection information:" + this.localInstanceConnectionInfo);
 
-		// Try to create local stub for the accumulators
+		// Start local RPC server, give it the number of threads as we have slots
 		try {
-			this.accumulatorProtocolProxy = RPC.getProxy(AccumulatorProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
+			this.taskManagerServer = RPC.getServer(this, taskManagerBindAddress.getHostAddress(), ipcPort, numberOfSlots);
+			this.taskManagerServer.start();
 		} catch (IOException e) {
-			LOG.fatal("Failed to initialize accumulator protocol: " + e.getMessage(), e);
-			throw new Exception("Failed to initialize accumulator protocol: " + e.getMessage(), e);
+			LOG.fatal("Failed to start TaskManager server. " + e.getMessage(), e);
+			throw new Exception("Failed to start taskmanager server. " + e.getMessage(), e);
 		}
+		
 
 		// Load profiler if it should be used
 		if (GlobalConfiguration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) {
@@ -348,20 +289,6 @@ public class TaskManager implements TaskOperationProtocol {
 			LOG.error(StringUtils.stringifyException(ioe));
 			throw new Exception("Failed to instantiate ChannelManager.", ioe);
 		}
-
-		// initialize the number of slots
-		{
-			int slots = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, -1);
-			if (slots == -1) {
-				slots = 1;
-				LOG.info("Number of task slots not configured. Creating one task slot.");
-			} else if (slots <= 0) {
-				throw new Exception("Illegal value for the number of task slots: " + slots);
-			} else {
-				LOG.info("Creating " + slots + " task slot(s).");
-			}
-			this.numberOfSlots = slots;
-		}
 		
 		// initialize the memory manager
 		{
@@ -457,626 +384,328 @@ public class TaskManager implements TaskOperationProtocol {
 			}).start();
 		}
 	}
-
-	private int getAvailablePort() {
-		ServerSocket serverSocket = null;
-		int port = 0;
-		for (int i = 0; i < 50; i++){
-			try {
-				serverSocket = new ServerSocket(0);
-				port = serverSocket.getLocalPort();
-				if (port != 0) {
-					serverSocket.close();
-					break;
-				}
-			} catch (IOException e) {
-				LOG.debug("Unable to allocate port " + e.getMessage(), e);
-			}
-		}
-		if (!serverSocket.isClosed()) {
-			try {
-				serverSocket.close();
-			} catch (IOException e) {
-				LOG.debug("error closing port",e);
-			}
-		}
-		return port;
-	}
-
+	
 	/**
-	 * Entry point for the program.
-	 * 
-	 * @param args
-	 *        arguments from the command line
-	 * @throws IOException 
+	 * Shuts the task manager down.
 	 */
-	@SuppressWarnings("static-access")
-	public static void main(String[] args) throws IOException {		
-		Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg().withDescription(
-			"Specify configuration directory.").create("configDir");
-		// tempDir option is used by the YARN client.
-		Option tempDir = OptionBuilder.withArgName("temporary directory (overwrites configured option)")
-				.hasArg().withDescription(
-				"Specify temporary directory.").create(ARG_CONF_DIR);
-		configDirOpt.setRequired(true);
-		tempDir.setRequired(false);
-		Options options = new Options();
-		options.addOption(configDirOpt);
-		options.addOption(tempDir);
+	public void shutdown() {
+		if (!this.shutdownStarted.compareAndSet(false, true)) {
+			return;
+		}
+
+		LOG.info("Shutting down TaskManager");
 		
+		// first, stop the heartbeat thread and wait for it to terminate
+		this.heartbeatThread.interrupt();
+		try {
+			this.heartbeatThread.join(1000);
+		} catch (InterruptedException e) {}
 
-		CommandLineParser parser = new GnuParser();
-		CommandLine line = null;
+		// Stop RPC proxy for the task manager
+		stopProxy(this.jobManager);
+
+		// Stop RPC proxy for the global input split assigner
+		stopProxy(this.globalInputSplitProvider);
+
+		// Stop RPC proxy for the lookup service
+		stopProxy(this.lookupService);
+
+		// Stop RPC proxy for accumulator reports
+		stopProxy(this.accumulatorProtocolProxy);
+
+		// Shut down the own RPC server
 		try {
-			line = parser.parse(options, args);
-		} catch (ParseException e) {
-			System.err.println("CLI Parsing failed. Reason: " + e.getMessage());
-			System.exit(STARTUP_FAILURE_RETURN_CODE);
+			this.taskManagerServer.stop();
+		} catch (Throwable t) {
+			LOG.warn("TaskManager RPC server did not shut down properly.", t);
 		}
 
-		String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
-		String tempDirVal = line.getOptionValue(tempDir.getOpt(), null);
-
-		// First, try to load global configuration
-		GlobalConfiguration.loadConfiguration(configDir);
-		if(tempDirVal != null // the YARN TM runner has set a value for the temp dir
-				// the configuration does not contain a temp direcory
-				&& GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null) == null) {
-			Configuration c = GlobalConfiguration.getConfiguration();
-			c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tempDirVal);
-			LOG.info("Setting temporary directory to "+tempDirVal);
-			GlobalConfiguration.includeConfiguration(c);
+		// Stop profiling if enabled
+		if (this.profiler != null) {
+			this.profiler.shutdown();
 		}
-		
-		// print some startup environment info, like user, code revision, etc
-		EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager");
-		
-		// Create a new task manager object
+
+		// Shut down the channel manager
 		try {
-			new TaskManager(ExecutionMode.CLUSTER);
-		} catch (Exception e) {
-			LOG.fatal("Taskmanager startup failed: " + e.getMessage(), e);
-			System.exit(STARTUP_FAILURE_RETURN_CODE);
+			this.channelManager.shutdown();
+		} catch (Throwable t) {
+			LOG.warn("ChannelManager did not shutdown properly: " + t.getMessage(), t);
 		}
-		
-		// park the main thread to keep the JVM alive (all other threads may be daemon threads)
-		Object mon = new Object();
-		synchronized (mon) {
-			try {
-				mon.wait();
-			} catch (InterruptedException ex) {}
+
+		// Shut down the memory manager
+		if (this.ioManager != null) {
+			this.ioManager.shutdown();
+		}
+
+		if (this.memoryManager != null) {
+			this.memoryManager.shutdown();
 		}
-	}
 
+		this.fileCache.shutdown();
 
+		// Shut down the executor service
+		if (this.executorService != null) {
+			this.executorService.shutdown();
+			try {
+				this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
+			} catch (InterruptedException e) {
+				LOG.debug(e);
+			}
+		}
 
+		this.shutdownComplete = true;
+	}
 	
 	/**
-	 * The states of address detection mechanism.
-	 * There is only a state transition if the current state failed to determine the address.
+	 * Checks whether the task manager has already been shut down.
+	 * 
+	 * @return <code>true</code> if the task manager has already been shut down, <code>false</code> otherwise
 	 */
-	private enum AddressDetectionState {
-		ADDRESS(50), 		//detect own IP based on the JobManagers IP address. Look for common prefix
-		FAST_CONNECT(50),	//try to connect to the JobManager on all Interfaces and all their addresses.
-							//this state uses a low timeout (say 50 ms) for fast detection.
-		SLOW_CONNECT(1000);	//same as FAST_CONNECT, but with a timeout of 1000 ms (1s).
-		
-		
-		private int timeout;
-		AddressDetectionState(int timeout) {
-			this.timeout = timeout;
-		}
-		public int getTimeout() {
-			return timeout;
-		}
+	public boolean isShutDown() {
+		return this.shutdownComplete;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Properties
+	// --------------------------------------------------------------------------------------------
+	
+	public InstanceConnectionInfo getConnectionInfo() {
+		return this.localInstanceConnectionInfo;
+	}
+	
+	public ExecutionMode getExecutionMode() {
+		return this.executionMode;
+	}
+	
+	/**
+	 * Gets the ID under which the TaskManager is currently registered at its JobManager.
+	 * If the TaskManager has not been registered, yet, or if it lost contact, this is is null.
+	 * 
+	 * @return The ID under which the TaskManager is currently registered.
+	 */
+	public InstanceID getRegisteredId() {
+		return this.registeredId;
 	}
 	
 	/**
-	 * Find out the TaskManager's own IP address.
+	 * Checks if the TaskManager is properly registered and ready to receive work.
+	 * 
+	 * @return True, if the TaskManager is registered, false otherwise.
 	 */
-	private InetAddress getTaskManagerAddress(InetSocketAddress jobManagerAddress) throws IOException {
-		AddressDetectionState strategy = AddressDetectionState.ADDRESS;
-
-		while (true) {
-			Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
-			while (e.hasMoreElements()) {
-				NetworkInterface n = e.nextElement();
-				Enumeration<InetAddress> ee = n.getInetAddresses();
-				while (ee.hasMoreElements()) {
-					InetAddress i = ee.nextElement();
-					switch (strategy) {
-					case ADDRESS:
-						if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
-							if (tryToConnect(i, jobManagerAddress, strategy.getTimeout())) {
-								LOG.info("Determined " + i + " as the TaskTracker's own IP address");
-								return i;
-							}
-						}
-						break;
-					case FAST_CONNECT:
-					case SLOW_CONNECT:
-						boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout());
-						if (correct) {
-							LOG.info("Determined " + i + " as the TaskTracker's own IP address");
-							return i;
-						}
-						break;
-					default:
-						throw new RuntimeException("Unkown address detection strategy: " + strategy);
-					}
-				}
-			}
-			// state control
-			switch (strategy) {
-			case ADDRESS:
-				strategy = AddressDetectionState.FAST_CONNECT;
-				break;
-			case FAST_CONNECT:
-				strategy = AddressDetectionState.SLOW_CONNECT;
-				break;
-			case SLOW_CONNECT:
-				throw new RuntimeException("The TaskManager is unable to connect to the JobManager (Address: '"+jobManagerAddress+"').");
-			}
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Defaulting to detection strategy " + strategy);
-			}
-		}
-	}
-	
-	/**
-	 * Checks if two addresses have a common prefix (first 2 bytes).
-	 * Example: 192.168.???.???
-	 * Works also with ipv6, but accepts probably too many addresses
-	 */
-	private static boolean hasCommonPrefix(byte[] address, byte[] address2) {
-		return address[0] == address2[0] && address[1] == address2[1];
-	}
-
-	public static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket, int timeout) throws IOException {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Trying to connect to JobManager (" + toSocket + ") from local address " + fromAddress
-				+ " with timeout " + timeout);
-		}
-		boolean connectable = true;
-		Socket socket = null;
-		try {
-			socket = new Socket();
-			SocketAddress bindP = new InetSocketAddress(fromAddress, 0); // 0 = let the OS choose the port on this
-																			// machine
-			socket.bind(bindP);
-			socket.connect(toSocket, timeout);
-		} catch (Exception ex) {
-			LOG.info("Failed to connect to JobManager from address '" + fromAddress + "': " + ex.getMessage());
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Failed with exception", ex);
-			}
-			connectable = false;
-		} finally {
-			if (socket != null) {
-				socket.close();
-			}
-		}
-		return connectable;
-	}
-	
+	public boolean isRegistered() {
+		return this.registeredId != null;
+	}
+	
+	public Map<ExecutionAttemptID, Task> getAllRunningTasks() {
+		return Collections.unmodifiableMap(this.runningTasks);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Task Operation
+	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public TaskCancelResult cancelTask(final ExecutionVertexID id) throws IOException {
+	public TaskOperationResult cancelTask(JobVertexID vertexId, int subtaskIndex, ExecutionAttemptID executionId) throws IOException {
 
-		final Task task = this.runningTasks.get(id);
+		final Task task = this.runningTasks.get(executionId);
 
 		if (task == null) {
-			final TaskCancelResult taskCancelResult = new TaskCancelResult(id,
-				AbstractTaskResult.ReturnCode.TASK_NOT_FOUND);
-			taskCancelResult.setDescription("No task with ID " + id + " is currently running");
-			return taskCancelResult;
+			return new TaskOperationResult(vertexId, subtaskIndex, executionId, false, "No task with that execution ID was found.");
 		}
 
 		// Pass call to executor service so IPC thread can return immediately
 		final Runnable r = new Runnable() {
-
 			@Override
 			public void run() {
-
-				// Finally, request user code to cancel
 				task.cancelExecution();
 			}
 		};
-
 		this.executorService.execute(r);
 
-		return new TaskCancelResult(id, AbstractTaskResult.ReturnCode.SUCCESS);
-	}
-
-	@Override
-	public TaskKillResult killTask(final ExecutionVertexID id) throws IOException {
-
-		final Task task = this.runningTasks.get(id);
-
-		if (task == null) {
-			final TaskKillResult taskKillResult = new TaskKillResult(id,
-					AbstractTaskResult.ReturnCode.TASK_NOT_FOUND);
-			taskKillResult.setDescription("No task with ID + " + id + " is currently running");
-			return taskKillResult;
-		}
-
-		// Pass call to executor service so IPC thread can return immediately
-		final Runnable r = new Runnable() {
-
-			@Override
-			public void run() {
-
-				// Finally, request user code to cancel
-				task.killExecution();
-			}
-		};
-
-		this.executorService.execute(r);
-
-		return new TaskKillResult(id, AbstractTaskResult.ReturnCode.SUCCESS);
+		// return success
+		return new TaskOperationResult(vertexId, subtaskIndex, executionId, true);
 	}
 
 
 	@Override
-	public List<TaskSubmissionResult> submitTasks(final List<TaskDeploymentDescriptor> tasks) throws IOException {
-
-		final List<TaskSubmissionResult> submissionResultList = new SerializableArrayList<TaskSubmissionResult>();
-		final List<Task> tasksToStart = new ArrayList<Task>();
-
-		// Make sure all tasks are fully registered before they are started
-		for (final TaskDeploymentDescriptor tdd : tasks) {
-
-			final JobID jobID = tdd.getJobID();
-			final ExecutionVertexID vertexID = tdd.getVertexID();
-			RuntimeEnvironment re;
-
-			// retrieve the registered cache files from job configuration and create the local tmp file.
-			Map<String, FutureTask<Path>> cpTasks = new HashMap<String, FutureTask<Path>>();
-			for (Entry<String, DistributedCacheEntry> e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) {
-				FutureTask<Path> cp = this.fileCache.createTmpFile(e.getKey(), e.getValue(), jobID);
-				cpTasks.put(e.getKey(), cp);
+	public TaskOperationResult submitTask(TaskDeploymentDescriptor tdd) {
+		final JobID jobID = tdd.getJobID();
+		final JobVertexID vertexId = tdd.getVertexID();
+		final ExecutionAttemptID executionId = tdd.getExecutionId();
+		final int taskIndex = tdd.getIndexInSubtaskGroup();
+		final int numSubtasks = tdd.getCurrentNumberOfSubtasks();
+		
+		try {
+			final ClassLoader userCodeClassLoader = LibraryCacheManager.getClassLoader(jobID);
+			if (userCodeClassLoader == null) {
+				throw new Exception("No user code ClassLoader available.");
 			}
-
-			try {
-				re = new RuntimeEnvironment(tdd, this.memoryManager, this.ioManager, new TaskInputSplitProvider(jobID,
-					vertexID, this.globalInputSplitProvider), this.accumulatorProtocolProxy, cpTasks);
-			} catch (Throwable t) {
-				final TaskSubmissionResult result = new TaskSubmissionResult(vertexID,
-					AbstractTaskResult.ReturnCode.DEPLOYMENT_ERROR);
-				result.setDescription(StringUtils.stringifyException(t));
-				LOG.error(result.getDescription(), t);
-				submissionResultList.add(result);
-				continue;
+			
+			final Task task = new Task(jobID, vertexId, taskIndex, numSubtasks, executionId, tdd.getTaskName(), this);
+			if (this.runningTasks.putIfAbsent(executionId, task) != null) {
+				throw new Exception("TaskManager contains already a task with executionId " + executionId);
 			}
-
-			final Configuration jobConfiguration = tdd.getJobConfiguration();
-
-			// Register the task
-			Task task;
+			
+			// another try/finally-success block to ensure that the tasks are removed properly in case of an exception
+			boolean success = false;
 			try {
-				task = createAndRegisterTask(vertexID, jobConfiguration, re);
-			} catch (InsufficientResourcesException e) {
-				final TaskSubmissionResult result = new TaskSubmissionResult(vertexID,
-					AbstractTaskResult.ReturnCode.INSUFFICIENT_RESOURCES);
-				result.setDescription(e.getMessage());
-				LOG.error(result.getDescription(), e);
-				submissionResultList.add(result);
-				continue;
-			}
-
-			if (task == null) {
-				final TaskSubmissionResult result = new TaskSubmissionResult(vertexID,
-					AbstractTaskResult.ReturnCode.TASK_NOT_FOUND);
-				result.setDescription("Task " + re.getTaskNameWithIndex() + " (" + vertexID + ") was already running");
-				LOG.error(result.getDescription());
-				submissionResultList.add(result);
-				continue;
-			}
-
-			submissionResultList.add(new TaskSubmissionResult(vertexID, AbstractTaskResult.ReturnCode.SUCCESS));
-			tasksToStart.add(task);
-		}
-
-		// Now start the tasks
-		for (final Task task : tasksToStart) {
-			task.startExecution();
-		}
-
-		return submissionResultList;
-	}
-
-	/**
-	 * Registers an newly incoming runtime task with the task manager.
-	 * 
-	 * @param id
-	 *        the ID of the task to register
-	 * @param jobConfiguration
-	 *        the job configuration that has been attached to the original job graph
-	 * @param environment
-	 *        the environment of the task to be registered
-	 * @return the task to be started or <code>null</code> if a task with the same ID was already running
-	 */
-	private Task createAndRegisterTask(final ExecutionVertexID id, final Configuration jobConfiguration,
-			final RuntimeEnvironment environment)
-					throws InsufficientResourcesException, IOException {
-
-		if (id == null) {
-			throw new IllegalArgumentException("Argument id is null");
-		}
-
-		if (environment == null) {
-			throw new IllegalArgumentException("Argument environment is null");
-		}
-
-		// Task creation and registration must be atomic
-		Task task;
-
-		synchronized (this) {
-			final Task runningTask = this.runningTasks.get(id);
-			boolean registerTask = true;
-			if (runningTask == null) {
-				task = new Task(id, environment, this);
-			} else {
-
-				if (runningTask instanceof Task) {
-					// Task is already running
-					return null;
-				} else {
-					// There is already a replay task running, we will simply restart it
-					task = runningTask;
-					registerTask = false;
-				}
-
-			}
-
-			if (registerTask) {
-				// Register the task with the byte buffered channel manager
+				final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider, jobID, vertexId);
+				final RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy);
+				task.setEnvironment(env);
+				
+				// register the task with the network stack and profilers
 				this.channelManager.register(task);
-
-				boolean enableProfiling = false;
-				if (this.profiler != null && jobConfiguration.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
-					enableProfiling = true;
-				}
-
+				
+				final Configuration jobConfig = tdd.getJobConfiguration();
+	
+				boolean enableProfiling = this.profiler != null && jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true);
+	
 				// Register environment, input, and output gates for profiling
 				if (enableProfiling) {
-					task.registerProfiler(this.profiler, jobConfiguration);
+					task.registerProfiler(this.profiler, jobConfig);
 				}
-
-				this.runningTasks.put(id, task);
-			}
-		}
-		return task;
-	}
-
-	/**
-	 * Unregisters a finished or aborted task.
-	 * 
-	 * @param id
-	 *        the ID of the task to be unregistered
-	 */
-	private void unregisterTask(final ExecutionVertexID id) {
-
-		// Task de-registration must be atomic
-		synchronized (this) {
-
-			final Task task = this.runningTasks.remove(id);
-			if (task == null) {
-				LOG.error("Cannot find task with ID " + id + " to unregister");
-				return;
-			}
-
-			// remove the local tmp file for unregistered tasks.
-			for (Entry<String, DistributedCacheEntry> e: DistributedCache.readFileInfoFromConfig(task.getEnvironment().getJobConfiguration())) {
-				this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), task.getJobID());
-			}
-			// Unregister task from the byte buffered channel manager
-			this.channelManager.unregister(id, task);
-
-			// Unregister task from profiling
-			task.unregisterProfiler(this.profiler);
-
-			// Unregister task from memory manager
-			task.unregisterMemoryManager(this.memoryManager);
-
-			// Unregister task from library cache manager
-			try {
-				LibraryCacheManager.unregister(task.getJobID());
-			} catch (IOException e) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Unregistering the job vertex ID " + id + " caused an IOException");
+				
+				// now that the task is successfully created and registered, we can start copying the
+				// distributed cache temp files
+				Map<String, FutureTask<Path>> cpTasks = new HashMap<String, FutureTask<Path>>();
+				for (Entry<String, DistributedCacheEntry> e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) {
+					FutureTask<Path> cp = this.fileCache.createTmpFile(e.getKey(), e.getValue(), jobID);
+					cpTasks.put(e.getKey(), cp);
+				}
+				env.addCopyTasksForCacheFile(cpTasks);
+			
+				if (!task.startExecution()) {
+					throw new Exception("Cannot start task. Task was canceled or failed.");
 				}
+			
+				success = true;
+				return new TaskOperationResult(vertexId, taskIndex, executionId, true);
 			}
-		}
-	}
-
-
-	@Override
-	public LibraryCacheProfileResponse getLibraryCacheProfile(LibraryCacheProfileRequest request) throws IOException {
-
-		LibraryCacheProfileResponse response = new LibraryCacheProfileResponse(request);
-		String[] requiredLibraries = request.getRequiredLibraries();
-
-		for (int i = 0; i < requiredLibraries.length; i++) {
-			if (LibraryCacheManager.contains(requiredLibraries[i]) == null) {
-				response.setCached(i, false);
-			} else {
-				response.setCached(i, true);
+			finally {
+				if (!success) {
+					// remove task 
+					this.runningTasks.remove(executionId);
+					// delete distributed cache files
+					for (Entry<String, DistributedCacheEntry> e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) {
+						this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), jobID);
+					}
+				}
 			}
 		}
-
-		return response;
-	}
-
-
-	@Override
-	public void updateLibraryCache(LibraryCacheUpdate update) throws IOException {
-		// Nothing to to here
-	}
-
-	public void executionStateChanged(final JobID jobID, final ExecutionVertexID id,
-			final ExecutionState newExecutionState, final String optionalDescription) {
-
-		// Don't propagate state CANCELING back to the job manager
-		if (newExecutionState == ExecutionState.CANCELING) {
-			return;
-		}
-
-		if (newExecutionState == ExecutionState.FINISHED || newExecutionState == ExecutionState.CANCELED
-				|| newExecutionState == ExecutionState.FAILED) {
-
-			// Unregister the task (free all buffers, remove all channels, task-specific class loaders, etc...)
-			unregisterTask(id);
-		}
-		// Get lock on the jobManager object and propagate the state change
-		synchronized (this.jobManager) {
+		catch (Throwable t) {
+			LOG.error("Could not instantiate task", t);
+			
 			try {
-				this.jobManager.updateTaskExecutionState(new TaskExecutionState(jobID, id, newExecutionState,
-					optionalDescription));
+				LibraryCacheManager.unregister(jobID);
 			} catch (IOException e) {
-				LOG.error(e);
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Unregistering the execution " + executionId + " caused an IOException");
+				}
 			}
+			
+			return new TaskOperationResult(vertexId, taskIndex, executionId, false, ExceptionUtils.stringifyException(t));
 		}
 	}
 
 	/**
-	 * Shuts the task manager down.
+	 * Unregisters a finished or aborted task.
+	 * 
+	 * @param executionId
+	 *        the ID of the task to be unregistered
 	 */
-	public void shutdown() {
+	private void unregisterTask(ExecutionAttemptID executionId) {
 
-		if (!this.shutdownStarted.compareAndSet(false, true)) {
+		// Task de-registration must be atomic
+		final Task task = this.runningTasks.remove(executionId);
+		if (task == null) {
+			LOG.error("Cannot find task with ID " + executionId + " to unregister");
 			return;
 		}
 
-		LOG.info("Shutting down TaskManager");
+		// remove the local tmp file for unregistered tasks.
+		for (Entry<String, DistributedCacheEntry> e: DistributedCache.readFileInfoFromConfig(task.getEnvironment().getJobConfiguration())) {
+			this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), task.getJobID());
+		}
 		
-		// first, stop the heartbeat thread and wait for it to terminate
-		this.heartbeatThread.interrupt();
-		try {
-			this.heartbeatThread.join(1000);
-		} catch (InterruptedException e) {}
-
-		// Stop RPC proxy for the task manager
-		RPC.stopProxy(this.jobManager);
-
-		// Stop RPC proxy for the global input split assigner
-		RPC.stopProxy(this.globalInputSplitProvider);
+		// Unregister task from the byte buffered channel manager
+		this.channelManager.unregister(executionId, task);
 
-		// Stop RPC proxy for the lookup service
-		RPC.stopProxy(this.lookupService);
+		// Unregister task from profiling
+		task.unregisterProfiler(this.profiler);
 
-		// Stop RPC proxy for accumulator reports
-		RPC.stopProxy(this.accumulatorProtocolProxy);
-
-		// Shut down the own RPC server
-		this.taskManagerServer.stop();
+		// Unregister task from memory manager
+		task.unregisterMemoryManager(this.memoryManager);
 
-		// Stop profiling if enabled
-		if (this.profiler != null) {
-			this.profiler.shutdown();
-		}
-
-		// Shut down the channel manager
+		// Unregister task from library cache manager
 		try {
-			this.channelManager.shutdown();
+			LibraryCacheManager.unregister(task.getJobID());
 		} catch (IOException e) {
-			LOG.warn("ChannelManager did not shutdown properly: " + e.getMessage(), e);
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Unregistering the execution " + executionId + " caused an IOException");
+			}
 		}
+	}
 
-		// Shut down the memory manager
-		if (this.ioManager != null) {
-			this.ioManager.shutdown();
+	public void notifyExecutionStateChange(JobID jobID, ExecutionAttemptID executionId, ExecutionState2 newExecutionState, String optionalDescription) {
+		
+		// Get lock on the jobManager object and propagate the state change
+		boolean success = false;
+		try {
+			this.jobManager.updateTaskExecutionState(new TaskExecutionState(jobID, executionId, newExecutionState, optionalDescription));
 		}
-
-		if (this.memoryManager != null) {
-			this.memoryManager.shutdown();
+		catch (Throwable t) {
+			String msg = "Error sending task state update to JobManager.";
+			LOG.error(msg, t);
+			ExceptionUtils.rethrow(t, msg);
 		}
-
-		this.fileCache.shutdown();
-
-		// Shut down the executor service
-		if (this.executorService != null) {
-			this.executorService.shutdown();
-			try {
-				this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
-			} catch (InterruptedException e) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug(e);
-				}
+		finally {
+			// in case of a failure, or when the tasks is in a finished state, then unregister the
+			// task (free all buffers, remove all channels, task-specific class loaders, etc...)
+			if (!success || newExecutionState == ExecutionState2.FINISHED || newExecutionState == ExecutionState2.CANCELED
+					|| newExecutionState == ExecutionState2.FAILED)
+			{
+				
+				unregisterTask(executionId);
 			}
 		}
-
-		this.shutdownComplete = true;
 	}
 
 	/**
-	 * Checks whether the task manager has already been shut down.
-	 * 
-	 * @return <code>true</code> if the task manager has already been shut down, <code>false</code> otherwise
+	 * Removes all tasks from this TaskManager.
 	 */
-	public boolean isShutDown() {
-		return this.shutdownComplete;
-	}
-
-	@Override
-	public void logBufferUtilization() {
-		this.channelManager.logBufferUtilization();
-	}
-
-	@Override
-	public void killTaskManager() throws IOException {
-		// Kill the entire JVM after a delay of 10ms, so this RPC will finish properly before
-		final Timer timer = new Timer();
-		final TimerTask timerTask = new TimerTask() {
-
-			@Override
-			public void run() {
-				System.exit(0);
-			}
-		};
-
-		timer.schedule(timerTask, 10L);
-	}
-
-
-	@Override
-	public void invalidateLookupCacheEntries(final Set<ChannelID> channelIDs) throws IOException {
-		this.channelManager.invalidateLookupCacheEntries(channelIDs);
-	}
-
 	public void cancelAndClearEverything() {
 		LOG.info("Cancelling all computations and discarding all cached data.");
+		for (Task t : runningTasks.values()) {
+			t.cancelExecution();
+			runningTasks.remove(t.getExecutionId());
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------
-	//  Properties
+	//  Library caching
 	// --------------------------------------------------------------------------------------------
 	
-	public InstanceConnectionInfo getConnectionInfo() {
-		return this.localInstanceConnectionInfo;
-	}
-	
-	public ExecutionMode getExecutionMode() {
-		return this.executionMode;
-	}
-	
-	/**
-	 * Gets the ID under which the TaskManager is currently registered at its JobManager.
-	 * If the TaskManager has not been registered, yet, or if it lost contact, this is is null.
-	 * 
-	 * @return The ID under which the TaskManager is currently registered.
-	 */
-	public InstanceID getRegisteredId() {
-		return this.registeredId;
+	@Override
+	public LibraryCacheProfileResponse getLibraryCacheProfile(LibraryCacheProfileRequest request) throws IOException {
+
+		LibraryCacheProfileResponse response = new LibraryCacheProfileResponse(request);
+		String[] requiredLibraries = request.getRequiredLibraries();
+
+		for (int i = 0; i < requiredLibraries.length; i++) {
+			if (LibraryCacheManager.contains(requiredLibraries[i]) == null) {
+				response.setCached(i, false);
+			} else {
+				response.setCached(i, true);
+			}
+		}
+
+		return response;
 	}
 	
-	/**
-	 * Checks if the TaskManager is properly registered and ready to receive work.
-	 * 
-	 * @return True, if the TaskManager is registered, false otherwise.
-	 */
-	public boolean isRegistered() {
-		return this.registeredId != null;
+	@Override
+	public void updateLibraryCache(LibraryCacheUpdate update) throws IOException {
+		// Nothing to to here, because the libraries are added to the cache when the
+		// update is deserialized (WE SHOULD CHANGE THAT!!!)
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -1243,7 +872,187 @@ public class TaskManager implements TaskOperationProtocol {
 
 		return str.toString();
 	}
+
+	
+	// --------------------------------------------------------------------------------------------
+	//  Execution & Initialization
+	// --------------------------------------------------------------------------------------------
+	
+	public static TaskManager createTaskManager(ExecutionMode mode) throws Exception {
+		
+		// IMPORTANT! At this point, the GlobalConfiguration must have been read!
+		
+		final InetSocketAddress jobManagerAddress;
+		LOG.info("Reading location of job manager from configuration");
+					
+		final String address = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+		final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+					
+		if (address == null) {
+			throw new Exception("Job manager address not configured in the GlobalConfiguration.");
+		}
+			
+		// Try to convert configured address to {@link InetAddress}
+		try {
+			final InetAddress tmpAddress = InetAddress.getByName(address);
+			jobManagerAddress = new InetSocketAddress(tmpAddress, port);
+		}
+		catch (UnknownHostException e) {
+			LOG.fatal("Could not resolve JobManager host name.");
+			throw new Exception("Could not resolve JobManager host name: " + e.getMessage(), e);
+		}
+		
+		return createTaskManager(mode, jobManagerAddress);
+	}
+	
+	public static TaskManager createTaskManager(ExecutionMode mode, InetSocketAddress jobManagerAddress) throws Exception {
+		// Determine our own public facing address and start the server
+		final InetAddress taskManagerAddress;
+		try {
+			taskManagerAddress = getTaskManagerAddress(jobManagerAddress);
+		}
+		catch (IOException e) {
+			throw new Exception("The TaskManager failed to determine the IP address of the interface that connects to the JobManager.", e);
+		}
+		
+		return createTaskManager(mode, jobManagerAddress, taskManagerAddress);
+	}
+	
+	
+	public static TaskManager createTaskManager(ExecutionMode mode, InetSocketAddress jobManagerAddress, InetAddress taskManagerAddress) throws Exception {
+		
+		// IMPORTANT! At this point, the GlobalConfiguration must have been read!
+		
+		LOG.info("Connecting to JobManager at: " + jobManagerAddress);
+		
+		// Create RPC connections to the JobManager
+		
+		JobManagerProtocol jobManager = null;
+		InputSplitProviderProtocol splitProvider = null;
+		ChannelLookupProtocol channelLookup = null;
+		AccumulatorProtocol accumulators = null;
+		
+		// try/finally block to close proxies if anything goes wrong
+		boolean success = false;
+		try {
+			// create the RPC call proxy to the job manager for jobs
+			try {
+				jobManager = RPC.getProxy(JobManagerProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
+			}
+			catch (IOException e) {
+				LOG.fatal("Could not connect to the JobManager: " + e.getMessage(), e);
+				throw new Exception("Failed to initialize connection to JobManager: " + e.getMessage(), e);
+			}
+			
+			// Try to create local stub of the global input split provider
+			try {
+				splitProvider = RPC.getProxy(InputSplitProviderProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
+			}
+			catch (IOException e) {
+				LOG.fatal(e.getMessage(), e);
+				throw new Exception("Failed to initialize connection to global input split provider: " + e.getMessage(), e);
+			}
+
+			// Try to create local stub for the lookup service
+			try {
+				channelLookup = RPC.getProxy(ChannelLookupProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
+			}
+			catch (IOException e) {
+				LOG.fatal(e.getMessage(), e);
+				throw new Exception("Failed to initialize channel lookup protocol. " + e.getMessage(), e);
+			}
+
+			// Try to create local stub for the accumulators
+			try {
+				accumulators = RPC.getProxy(AccumulatorProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
+			}
+			catch (IOException e) {
+				LOG.fatal("Failed to initialize accumulator protocol: " + e.getMessage(), e);
+				throw new Exception("Failed to initialize accumulator protocol: " + e.getMessage(), e);
+			}
+			
+			TaskManager tm = new TaskManager(mode, jobManager, splitProvider, channelLookup, accumulators, jobManagerAddress, taskManagerAddress);
+			success = true;
+			return tm;
+		}
+		finally {
+			if (!success) {
+				stopProxy(jobManager);
+				stopProxy(splitProvider);
+				stopProxy(channelLookup);
+				stopProxy(accumulators);
+			}
+		}
+	}
+	
+
+	// --------------------------------------------------------------------------------------------
+	//  Executable
+	// --------------------------------------------------------------------------------------------
 	
+	/**
+	 * Entry point for the TaskManager executable.
+	 * 
+	 * @param args Arguments from the command line
+	 * @throws IOException 
+	 */
+	@SuppressWarnings("static-access")
+	public static void main(String[] args) throws IOException {		
+		Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg().withDescription(
+			"Specify configuration directory.").create("configDir");
+		// tempDir option is used by the YARN client.
+		Option tempDir = OptionBuilder.withArgName("temporary directory (overwrites configured option)")
+				.hasArg().withDescription(
+				"Specify temporary directory.").create(ARG_CONF_DIR);
+		configDirOpt.setRequired(true);
+		tempDir.setRequired(false);
+		Options options = new Options();
+		options.addOption(configDirOpt);
+		options.addOption(tempDir);
+		
+
+		CommandLineParser parser = new GnuParser();
+		CommandLine line = null;
+		try {
+			line = parser.parse(options, args);
+		} catch (ParseException e) {
+			System.err.println("CLI Parsing failed. Reason: " + e.getMessage());
+			System.exit(STARTUP_FAILURE_RETURN_CODE);
+		}
+
+		String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
+		String tempDirVal = line.getOptionValue(tempDir.getOpt(), null);
+
+		// First, try to load global configuration
+		GlobalConfiguration.loadConfiguration(configDir);
+		if(tempDirVal != null // the YARN TM runner has set a value for the temp dir
+				// the configuration does not contain a temp directory
+				&& GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null) == null) {
+			Configuration c = GlobalConfiguration.getConfiguration();
+			c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tempDirVal);
+			LOG.info("Setting temporary directory to "+tempDirVal);
+			GlobalConfiguration.includeConfiguration(c);
+		}
+		
+		// print some startup environment info, like user, code revision, etc
+		EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager");
+		
+		// Create a new task manager object
+		try {
+			createTaskManager(ExecutionMode.CLUSTER);
+		} catch (Exception e) {
+			LOG.fatal("Taskmanager startup failed: " + e.getMessage(), e);
+			System.exit(STARTUP_FAILURE_RETURN_CODE);
+		}
+		
+		// park the main thread to keep the JVM alive (all other threads may be daemon threads)
+		Object mon = new Object();
+		synchronized (mon) {
+			try {
+				mon.wait();
+			} catch (InterruptedException ex) {}
+		}
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	// Miscellaneous Utilities
@@ -1268,30 +1077,169 @@ public class TaskManager implements TaskOperationProtocol {
 			if (!f.exists()) {
 				throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' does not exist.");
 			}
-
 			if (!f.isDirectory()) {
 				throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not a directory.");
 			}
-
 			if (!f.canWrite()) {
 				throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not writable.");
 			}
 		}
 	}
 	
-	public static class EmergencyShutdownExceptionHandler implements Thread.UncaughtExceptionHandler {
+	/**
+	 * Stops the given RPC protocol proxy, if it is not null.
+	 * This method never throws an exception, it only logs errors.
+	 * 
+	 * @param protocol The protocol proxy to stop.
+	 */
+	private static final void stopProxy(VersionedProtocol protocol) {
+		if (protocol != null) {
+			try {
+				RPC.stopProxy(protocol);
+			}
+			catch (Throwable t) {
+				LOG.error("Error while shutting down RPC proxy.", t);
+			}
+		}
+	}
+	
+	/**
+	 * Determines the IP address of the interface from which the TaskManager can connect to the given JobManager
+	 * IP address.
+	 * 
+	 * @param jobManagerAddress The socket address to connect to.
+	 * @return The IP address of the interface that connects to the JobManager.
+	 * @throws IOException If no connection could be established.
+	 */
+	private static InetAddress getTaskManagerAddress(InetSocketAddress jobManagerAddress) throws IOException {
+		AddressDetectionState strategy = AddressDetectionState.ADDRESS;
 
-		private final TaskManager tm;
-		
-		public EmergencyShutdownExceptionHandler(TaskManager tm) {
-			this.tm = tm;
+		while (true) {
+			Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
+			while (e.hasMoreElements()) {
+				NetworkInterface n = e.nextElement();
+				Enumeration<InetAddress> ee = n.getInetAddresses();
+				while (ee.hasMoreElements()) {
+					InetAddress i = ee.nextElement();
+					switch (strategy) {
+					case ADDRESS:
+						if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
+							if (tryToConnect(i, jobManagerAddress, strategy.getTimeout())) {
+								LOG.info("Determined " + i + " as the TaskTracker's own IP address");
+								return i;
+							}
+						}
+						break;
+					case FAST_CONNECT:
+					case SLOW_CONNECT:
+						boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout());
+						if (correct) {
+							LOG.info("Determined " + i + " as the TaskTracker's own IP address");
+							return i;
+						}
+						break;
+					default:
+						throw new RuntimeException("Unkown address detection strategy: " + strategy);
+					}
+				}
+			}
+			// state control
+			switch (strategy) {
+			case ADDRESS:
+				strategy = AddressDetectionState.FAST_CONNECT;
+				break;
+			case FAST_CONNECT:
+				strategy = AddressDetectionState.SLOW_CONNECT;
+				break;
+			case SLOW_CONNECT:
+				throw new RuntimeException("The TaskManager is unable to connect to the JobManager (Address: '"+jobManagerAddress+"').");
+			}
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Defaulting to detection strategy " + strategy);
+			}
+		}
+	}
+	
+	/**
+	 * Searches for an available free port and returns the port number.
+	 * 
+	 * @return An available port.
+	 * @throws RuntimeException Thrown, if no free port was found.
+	 */
+	private static final int getAvailablePort() {
+		for (int i = 0; i < 50; i++) {
+			ServerSocket serverSocket = null;
+			try {
+				serverSocket = new ServerSocket(0);
+				int port = serverSocket.getLocalPort();
+				if (port != 0) {
+					return port;
+				}
+			} catch (IOException e) {
+				LOG.debug("Unable to allocate port " + e.getMessage(), e);
+			} finally {
+				if (serverSocket != null) {
+					try { serverSocket.close(); } catch (Throwable t) {}
+				}
+			}
 		}
 		
-		@Override
-		public void uncaughtException(Thread t, Throwable e) {
-			LOG.fatal("Thread " + t.getName() + " caused an unrecoverable exception.", e);
-			tm.shutdown();
+		throw new RuntimeException("Could not find a free permitted port on the machine.");
+	}
+	
+	/**
+	 * Checks if two addresses have a common prefix (first 2 bytes).
+	 * Example: 192.168.???.???
+	 * Works also with ipv6, but accepts probably too many addresses
+	 */
+	private static boolean hasCommonPrefix(byte[] address, byte[] address2) {
+		return address[0] == address2[0] && address[1] == address2[1];
+	}
+
+	private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket, int timeout) throws IOException {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Trying to connect to JobManager (" + toSocket + ") from local address " + fromAddress
+				+ " with timeout " + timeout);
+		}
+		boolean connectable = true;
+		Socket socket = null;
+		try {
+			socket = new Socket();
+			SocketAddress bindP = new InetSocketAddress(fromAddress, 0); // 0 = let the OS choose the port on this
+																			// machine
+			socket.bind(bindP);
+			socket.connect(toSocket, timeout);
+		} catch (Exception ex) {
+			LOG.info("Failed to connect to JobManager from address '" + fromAddress + "': " + ex.getMessage());
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Failed with exception", ex);
+			}
+			connectable = false;
+		} finally {
+			if (socket != null) {
+				socket.close();
+			}
 		}
+		return connectable;
+	}
+	
+	/**
+	 * The states of address detection mechanism.
+	 * There is only a state transition if the current state failed to determine the address.
+	 */
+	private enum AddressDetectionState {
+		ADDRESS(50), 		//detect own IP based on the JobManagers IP address. Look for common prefix
+		FAST_CONNECT(50),	//try to connect to the JobManager on all Interfaces and all their addresses.
+							//this state uses a low timeout (say 50 ms) for fast detection.
+		SLOW_CONNECT(1000);	//same as FAST_CONNECT, but with a timeout of 1000 ms (1s).
 		
+		
+		private int timeout;
+		AddressDetectionState(int timeout) {
+			this.timeout = timeout;
+		}
+		public int getTimeout() {
+			return timeout;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.java
new file mode 100644
index 0000000..f0f00a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.types.StringValue;
+
+import com.google.common.base.Preconditions;
+
+
+public class TaskOperationResult implements IOReadableWritable {
+
+	private JobVertexID vertexId;
+	
+	private int subtaskIndex;
+	
+	private ExecutionAttemptID executionId;
+	
+	private boolean success;
+	
+	private String description;
+
+
+	public TaskOperationResult() {
+		this(new JobVertexID(), -1, new ExecutionAttemptID(), false);
+	}
+	
+	public TaskOperationResult(JobVertexID vertexId, int subtaskIndex, ExecutionAttemptID executionId, boolean success) {
+		this(vertexId, subtaskIndex, executionId, success, null);
+	}
+	
+	public TaskOperationResult(JobVertexID vertexId, int subtaskIndex, ExecutionAttemptID executionId, boolean success, String description) {
+		Preconditions.checkNotNull(vertexId);
+		Preconditions.checkNotNull(executionId);
+		
+		this.vertexId = vertexId;
+		this.subtaskIndex = subtaskIndex;
+		this.executionId = executionId;
+		this.success = success;
+		this.description = description;
+	}
+
+
+	public JobVertexID getVertexId() {
+		return vertexId;
+	}
+	
+	public int getSubtaskIndex() {
+		return subtaskIndex;
+	}
+	
+	public ExecutionAttemptID getExecutionId() {
+		return executionId;
+	}
+	
+	public boolean isSuccess() {
+		return success;
+	}
+
+	public String getDescription() {
+		return description;
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.vertexId.read(in);
+		this.subtaskIndex = in.readInt();
+		this.success = in.readBoolean();
+		
+		if (in.readBoolean()) {
+			this.description = StringValue.readString(in);
+		}
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		this.vertexId.write(out);
+		out.writeInt(subtaskIndex);
+		out.writeBoolean(success);
+		
+		if (description != null) {
+			out.writeBoolean(true);
+			StringValue.writeString(description, out);
+		} else {
+			out.writeBoolean(false);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskSubmissionResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskSubmissionResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskSubmissionResult.java
deleted file mode 100644
index ae88f3b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskSubmissionResult.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
-
-/**
- * A <code>TaskSubmissionResult</code> is used to report the results
- * of a task submission. It contains the ID of the submitted task, a return code and
- * a description. In case of a submission error the description includes an error message.
- * 
- */
-public class TaskSubmissionResult extends AbstractTaskResult {
-
-	/**
-	 * Constructs a new task submission result.
-	 * 
-	 * @param vertexID
-	 *        the task ID this result belongs to
-	 * @param returnCode
-	 *        the return code of the submission
-	 */
-	public TaskSubmissionResult(ExecutionVertexID vertexID, ReturnCode returnCode) {
-		super(vertexID, returnCode);
-	}
-
-	/**
-	 * Constructs an empty task submission result.
-	 */
-	public TaskSubmissionResult() {
-		super();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnumUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnumUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnumUtils.java
index 9694f63..312c9ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnumUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnumUtils.java
@@ -16,18 +16,16 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.flink.core.io.StringRecord;
+import org.apache.flink.types.StringValue;
 
 /**
  * Auxiliary class to (de)serialize enumeration values.
- * 
  */
 public final class EnumUtils {
 
@@ -56,7 +54,7 @@ public final class EnumUtils {
 			return null;
 		}
 
-		return T.valueOf(enumType, StringRecord.readString(in));
+		return T.valueOf(enumType, StringValue.readString(in));
 	}
 
 	/**
@@ -75,7 +73,7 @@ public final class EnumUtils {
 			out.writeBoolean(false);
 		} else {
 			out.writeBoolean(true);
-			StringRecord.writeString(out, enumVal.name());
+			StringValue.writeString(enumVal.name(), out);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
index 964a754..fa7ad1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
@@ -19,20 +19,46 @@
 package org.apache.flink.runtime.util;
 
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class ExecutorThreadFactory implements ThreadFactory {
 	
+	private static final Log LOG = LogFactory.getLog(ExecutorThreadFactory.class);
+	
+	
+	private static final String THREAD_NAME_PREFIX = "Flink Executor Thread - ";
+	
+	private static final AtomicInteger COUNTER = new AtomicInteger(1);
+	
+	private static final ThreadGroup THREAD_GROUP = new ThreadGroup("Flink Executor Threads");
+	
+	private static final Thread.UncaughtExceptionHandler EXCEPTION_HANDLER = new LoggingExceptionHander();
+	
+	
 	public static final ExecutorThreadFactory INSTANCE = new ExecutorThreadFactory();
-
-	private static final String THREAD_NAME = "Flink Executor Thread";
 	
+	// --------------------------------------------------------------------------------------------
 	
 	private ExecutorThreadFactory() {}
 	
 	
 	public Thread newThread(Runnable target) {
-		Thread t = new Thread(target, THREAD_NAME);
+		Thread t = new Thread(THREAD_GROUP, target, THREAD_NAME_PREFIX + COUNTER.getAndIncrement());
 		t.setDaemon(true);
+		t.setUncaughtExceptionHandler(EXCEPTION_HANDLER);
 		return t;
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static final class LoggingExceptionHander implements Thread.UncaughtExceptionHandler {
+
+		@Override
+		public void uncaughtException(Thread t, Throwable e) {
+			LOG.error("Thread '" + t.getName() + "' produced an uncaught exception.", e);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
index d4db568..5bb9451 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
@@ -110,45 +110,11 @@ public class SerializableArrayList<E extends IOReadableWritable> extends ArrayLi
 
 
 	@Override
-	public boolean equals(final Object obj) {
-
+	public boolean equals(Object obj) {
 		if (!(obj instanceof SerializableArrayList<?>)) {
 			return false;
 		}
 
-		final SerializableArrayList<?> sal = (SerializableArrayList<?>) obj;
-
-		if (this.size() != sal.size()) {
-			return false;
-		}
-
-		final Iterator<E> it = iterator();
-		final Iterator<?> it2 = sal.iterator();
-		while (it.hasNext()) {
-
-			final E e = it.next();
-			final Object obj2 = it2.next();
-			if (!e.equals(obj2)) {
-				return false;
-			}
-		}
-
-		return true;
-	}
-
-
-	@Override
-	public int hashCode() {
-
-		int hashCode = Integer.MIN_VALUE;
-
-		if (!isEmpty()) {
-			final E e = get(0);
-			hashCode += Math.abs(e.getClass().hashCode());
-		}
-
-		hashCode += size();
-
-		return hashCode;
+		return (obj instanceof SerializableArrayList) && super.equals(obj);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobResultTest.java
index bc3173f..9ae6e2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobResultTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.client;
 
 import static org.junit.Assert.assertEquals;
@@ -36,7 +35,6 @@ import org.junit.Test;
 
 /**
  * This class contains test concerning all classes which are derived from {@link AbstractJobResult}.
- * 
  */
 public class JobResultTest {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptorTest.java
index d0b1504..ffbcf41 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptorTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.deployment;
 
 import static org.junit.Assert.assertEquals;
@@ -30,10 +29,7 @@ import org.apache.flink.runtime.testutils.ServerTestUtils;
 import org.apache.flink.util.StringUtils;
 import org.junit.Test;
 
-/**
- * This class contains unit tests for the {@link ChannelDeploymentDescriptor} class.
- * 
- */
+
 public class ChannelDeploymentDescriptorTest {
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptorTest.java
deleted file mode 100644
index 68d285a..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptorTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.deployment;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.io.network.gates.GateID;
-import org.apache.flink.runtime.testutils.ServerTestUtils;
-import org.apache.flink.util.StringUtils;
-import org.junit.Test;
-
-/**
- * This class contains unit tests for the {@link GateDeploymentDescriptor} class.
- * 
- */
-public class GateDeploymentDescriptorTest {
-
-	/**
-	 * Tests the constructor of the {@link GateDeploymentDescriptor} class with valid arguments.
-	 */
-	@Test
-	public void testConstructorWithValidArguments() {
-
-		final GateID gateID = new GateID();
-		final ChannelType channelType = ChannelType.IN_MEMORY;
-		final List<ChannelDeploymentDescriptor> channels = new ArrayList<ChannelDeploymentDescriptor>(0);
-
-		final GateDeploymentDescriptor gdd = new GateDeploymentDescriptor(gateID, channelType, channels);
-
-		assertEquals(gateID, gdd.getGateID());
-		assertEquals(channelType, gdd.getChannelType());
-		assertEquals(channels.size(), gdd.getNumberOfChannelDescriptors());
-	}
-
-	/**
-	 * Tests the constructor of the {@link GateDeploymentDescriptor} class with valid arguments.
-	 */
-	@Test
-	public void testConstructorWithInvalidArguments() {
-
-		final GateID gateID = new GateID();
-		final ChannelType channelType = ChannelType.IN_MEMORY;
-		final List<ChannelDeploymentDescriptor> channels = new ArrayList<ChannelDeploymentDescriptor>(0);
-
-		boolean firstExceptionCaught = false;
-		boolean secondExceptionCaught = false;
-		boolean thirdExceptionCaught = false;
-
-		try {
-			new GateDeploymentDescriptor(null, channelType, channels);
-		} catch (IllegalArgumentException e) {
-			firstExceptionCaught = true;
-		}
-
-		try {
-			new GateDeploymentDescriptor(gateID, null, channels);
-		} catch (IllegalArgumentException e) {
-			secondExceptionCaught = true;
-		}
-
-		try {
-			new GateDeploymentDescriptor(gateID, channelType, null);
-		} catch (IllegalArgumentException e) {
-			thirdExceptionCaught = true;
-		}
-
-		if (!firstExceptionCaught) {
-			fail("First argument was illegal but not detected");
-		}
-
-		if (!secondExceptionCaught) {
-			fail("Second argument was illegal but not detected");
-		}
-
-
-		if (!thirdExceptionCaught) {
-			fail("Third argument was illegal but not detected");
-		}
-	}
-
-	/**
-	 * Tests the serialization/deserialization of the {@link GateDeploymentDescriptor} class.
-	 */
-	@Test
-	public void testSerialization() {
-
-		final GateID gateID = new GateID();
-		final ChannelType channelType = ChannelType.IN_MEMORY;
-		final List<ChannelDeploymentDescriptor> channels = new ArrayList<ChannelDeploymentDescriptor>(0);
-		final ChannelDeploymentDescriptor cdd = new ChannelDeploymentDescriptor(new ChannelID(), new ChannelID());
-		channels.add(cdd);
-
-		final GateDeploymentDescriptor orig = new GateDeploymentDescriptor(gateID, channelType,
-			channels);
-
-		GateDeploymentDescriptor copy = null;
-
-		try {
-			copy = ServerTestUtils.createCopy(orig);
-		} catch (IOException ioe) {
-			fail(StringUtils.stringifyException(ioe));
-		}
-
-		assertFalse(orig.getGateID() == copy.getGateID());
-
-		assertEquals(orig.getGateID(), copy.getGateID());
-		assertEquals(orig.getChannelType(), copy.getChannelType());
-		assertEquals(orig.getNumberOfChannelDescriptors(), copy.getNumberOfChannelDescriptors());
-		assertEquals(orig.getChannelDescriptor(0).getOutputChannelID(), copy.getChannelDescriptor(0)
-			.getOutputChannelID());
-		assertEquals(orig.getChannelDescriptor(0).getInputChannelID(), copy.getChannelDescriptor(0).getInputChannelID());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 3a1aa40..1f66f93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -16,275 +16,64 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.deployment;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.RegularPactTask;
-import org.apache.flink.runtime.testutils.ServerTestUtils;
-import org.apache.flink.runtime.util.SerializableArrayList;
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.junit.Test;
 
-/**
- * This class contains unit tests for the {@link TaskDeploymentDescriptor} class.
- * 
- */
 public class TaskDeploymentDescriptorTest {
-	/**
-	 * Tests the constructor of the {@link TaskDeploymentDescriptor} class with valid arguments.
-	 */
-	@Test
-	public void testConstructorWithValidArguments() {
-
-		final JobID jobID = new JobID();
-		final ExecutionVertexID vertexID = new ExecutionVertexID();
-		final String taskName = "task name";
-		final int indexInSubtaskGroup = 0;
-		final int currentNumberOfSubtasks = 1;
-		final Configuration jobConfiguration = new Configuration();
-		final Configuration taskConfiguration = new Configuration();
-		final Class<? extends AbstractInvokable> invokableClass =  RegularPactTask.class;
-		final SerializableArrayList<GateDeploymentDescriptor> outputGates = new SerializableArrayList<GateDeploymentDescriptor>(
-			0);
-		final SerializableArrayList<GateDeploymentDescriptor> inputGates = new SerializableArrayList<GateDeploymentDescriptor>(
-			0);
-
-		final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jobID, vertexID, taskName,
-			indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
-			invokableClass, outputGates, inputGates);
 
-		assertEquals(jobID, tdd.getJobID());
-		assertEquals(vertexID, tdd.getVertexID());
-		assertEquals(taskName, tdd.getTaskName());
-		assertEquals(indexInSubtaskGroup, tdd.getIndexInSubtaskGroup());
-		assertEquals(currentNumberOfSubtasks, tdd.getCurrentNumberOfSubtasks());
-		assertEquals(jobConfiguration, tdd.getJobConfiguration());
-		assertEquals(taskConfiguration, tdd.getTaskConfiguration());
-		assertEquals(invokableClass, tdd.getInvokableClass());
-		assertEquals(outputGates.size(), tdd.getNumberOfOutputGateDescriptors());
-		assertEquals(inputGates.size(), tdd.getNumberOfInputGateDescriptors());
-	}
-
-	/**
-	 * Tests the constructor of the {@link GateDeploymentDescriptor} class with valid arguments.
-	 */
-	@Test
-	public void testConstructorWithInvalidArguments() {
-
-		final JobID jobID = new JobID();
-		final ExecutionVertexID vertexID = new ExecutionVertexID();
-		final String taskName = "task name";
-		final int indexInSubtaskGroup = 0;
-		final int currentNumberOfSubtasks = 1;
-		final Configuration jobConfiguration = new Configuration();
-		final Configuration taskConfiguration = new Configuration();
-		final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
-		final SerializableArrayList<GateDeploymentDescriptor> outputGates = new SerializableArrayList<GateDeploymentDescriptor>(
-			0);
-		final SerializableArrayList<GateDeploymentDescriptor> inputGates = new SerializableArrayList<GateDeploymentDescriptor>(
-			0);
-
-		boolean firstExceptionCaught = false;
-		boolean secondExceptionCaught = false;
-		boolean thirdExceptionCaught = false;
-		boolean forthExceptionCaught = false;
-		boolean fifthExceptionCaught = false;
-		boolean sixthExceptionCaught = false;
-		boolean seventhExceptionCaught = false;
-		boolean eighthExceptionCaught = false;
-		boolean ninethExeceptionCaught = false;
-		boolean tenthExceptionCaught = false;
-
-		try {
-			new TaskDeploymentDescriptor(null, vertexID, taskName,
-				indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
-				invokableClass, outputGates, inputGates);
-		} catch (IllegalArgumentException e) {
-			firstExceptionCaught = true;
-		}
-
-		try {
-			new TaskDeploymentDescriptor(jobID, null, taskName,
-				indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
-				invokableClass, outputGates, inputGates);
-		} catch (IllegalArgumentException e) {
-			secondExceptionCaught = true;
-		}
-
-		try {
-			new TaskDeploymentDescriptor(jobID, vertexID, null,
-				indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
-				invokableClass, outputGates, inputGates);
-		} catch (IllegalArgumentException e) {
-			thirdExceptionCaught = true;
-		}
-
-		try {
-			new TaskDeploymentDescriptor(jobID, vertexID, taskName,
-				-1, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
-				invokableClass, outputGates, inputGates);
-		} catch (IllegalArgumentException e) {
-			forthExceptionCaught = true;
-		}
-
-		try {
-			new TaskDeploymentDescriptor(jobID, vertexID, taskName,
-				indexInSubtaskGroup, -1, jobConfiguration, taskConfiguration,
-				invokableClass, outputGates, inputGates);
-		} catch (IllegalArgumentException e) {
-			fifthExceptionCaught = true;
-		}
-
-		try {
-			new TaskDeploymentDescriptor(jobID, vertexID, taskName,
-				indexInSubtaskGroup, currentNumberOfSubtasks, null, taskConfiguration,
-				invokableClass, outputGates, inputGates);
-		} catch (IllegalArgumentException e) {
-			sixthExceptionCaught = true;
-		}
-
-		try {
-			new TaskDeploymentDescriptor(jobID, vertexID, taskName,
-				indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, null,
-				invokableClass, outputGates, inputGates);
-		} catch (IllegalArgumentException e) {
-			seventhExceptionCaught = true;
-		}
-
-		try {
-			new TaskDeploymentDescriptor(jobID, vertexID, taskName,
-				indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
-				null, outputGates, inputGates);
-		} catch (IllegalArgumentException e) {
-			eighthExceptionCaught = true;
-			
-		}
-
-		try {
-			new TaskDeploymentDescriptor(jobID, vertexID, taskName,
-				indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
-				invokableClass, null, inputGates);
-		} catch (IllegalArgumentException e) {
-			ninethExeceptionCaught = true;
-			
-		}
-
-		try {
-			new TaskDeploymentDescriptor(jobID, vertexID, taskName,
-				indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
-				invokableClass, outputGates, null);
-		} catch (IllegalArgumentException e) {
-			tenthExceptionCaught = true;
-		}
-
-		if (!firstExceptionCaught) {
-			fail("First argument was illegal but not detected");
-		}
-
-		if (!secondExceptionCaught) {
-			fail("Second argument was illegal but not detected");
-		}
-
-		if (!thirdExceptionCaught) {
-			fail("Third argument was illegal but not detected");
-		}
-
-		if (!forthExceptionCaught) {
-			fail("Forth argument was illegal but not detected");
-		}
-
-		if (!fifthExceptionCaught) {
-			fail("Fifth argument was illegal but not detected");
-		}
-
-		if (!sixthExceptionCaught) {
-			fail("Sixth argument was illegal but not detected");
-		}
-
-		if (!seventhExceptionCaught) {
-			fail("Seventh argument was illegal but not detected");
-		}
-
-		if (!eighthExceptionCaught) {
-			fail("Eighth argument was illegal but not detected");
-		}
-
-		if (!ninethExeceptionCaught) {
-			fail("Nineth argument was illegal but not detected");
-		}
-
-		if (!tenthExceptionCaught) {
-			fail("Tenth argument was illegal but not detected");
-		}
-
-	}
-
-	/**
-	 * Tests the serialization/deserialization of the {@link TaskDeploymentDescriptor} class.
-	 */
 	@Test
 	public void testSerialization() {
-
-		final JobID jobID = new JobID();
-		final ExecutionVertexID vertexID = new ExecutionVertexID();
-		final String taskName = "task name";
-		final int indexInSubtaskGroup = 0;
-		final int currentNumberOfSubtasks = 1;
-		final Configuration jobConfiguration = new Configuration();
-		final Configuration taskConfiguration = new Configuration();
-		final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
-		final SerializableArrayList<GateDeploymentDescriptor> outputGates = new SerializableArrayList<GateDeploymentDescriptor>(
-			0);
-		final SerializableArrayList<GateDeploymentDescriptor> inputGates = new SerializableArrayList<GateDeploymentDescriptor>(
-			0);
-
-		final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, vertexID, taskName,
-			indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
-			invokableClass, outputGates, inputGates);
-
-		TaskDeploymentDescriptor copy = null;
-
-		try {
-			LibraryCacheManager.register(jobID, new String[] {});
-		} catch (IOException ioe) {
-			fail(StringUtils.stringifyException(ioe));
-		}
-
-		try {
-			copy = ServerTestUtils.createCopy(orig);
-		} catch (IOException ioe) {
-			fail(StringUtils.stringifyException(ioe));
-		}
-
-		assertFalse(orig.getJobID() == copy.getJobID());
-		assertFalse(orig.getVertexID() == copy.getVertexID());
-		assertFalse(orig.getTaskName() == copy.getTaskName());
-		assertFalse(orig.getJobConfiguration() == copy.getJobConfiguration());
-		assertFalse(orig.getTaskConfiguration() == copy.getTaskConfiguration());
-
-		assertEquals(orig.getJobID(), copy.getJobID());
-		assertEquals(orig.getVertexID(), copy.getVertexID());
-		assertEquals(orig.getTaskName(), copy.getTaskName());
-		assertEquals(orig.getIndexInSubtaskGroup(), copy.getIndexInSubtaskGroup());
-		assertEquals(orig.getCurrentNumberOfSubtasks(), copy.getCurrentNumberOfSubtasks());
-		assertEquals(orig.getNumberOfOutputGateDescriptors(), copy.getNumberOfOutputGateDescriptors());
-		assertEquals(orig.getNumberOfInputGateDescriptors(), copy.getNumberOfInputGateDescriptors());
-
 		try {
-			LibraryCacheManager.register(jobID, new String[] {});
-		} catch (IOException ioe) {
-			fail(StringUtils.stringifyException(ioe));
+			final JobID jobID = new JobID();
+			final JobVertexID vertexID = new JobVertexID();
+			final ExecutionAttemptID execId = new ExecutionAttemptID();
+			final String taskName = "task name";
+			final int indexInSubtaskGroup = 0;
+			final int currentNumberOfSubtasks = 1;
+			final Configuration jobConfiguration = new Configuration();
+			final Configuration taskConfiguration = new Configuration();
+			final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
+			final List<GateDeploymentDescriptor> outputGates = new ArrayList<GateDeploymentDescriptor>(0);
+			final List<GateDeploymentDescriptor> inputGates = new ArrayList<GateDeploymentDescriptor>(0);
+	
+			final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, vertexID, execId, taskName,
+				indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
+				invokableClass.getName(), outputGates, inputGates, new String[] { "jar1", "jar2" }, 47);
+	
+			final TaskDeploymentDescriptor copy = CommonTestUtils.createCopyWritable(orig);
+	
+			assertFalse(orig.getJobID() == copy.getJobID());
+			assertFalse(orig.getVertexID() == copy.getVertexID());
+			assertFalse(orig.getTaskName() == copy.getTaskName());
+			assertFalse(orig.getJobConfiguration() == copy.getJobConfiguration());
+			assertFalse(orig.getTaskConfiguration() == copy.getTaskConfiguration());
+	
+			assertEquals(orig.getJobID(), copy.getJobID());
+			assertEquals(orig.getVertexID(), copy.getVertexID());
+			assertEquals(orig.getTaskName(), copy.getTaskName());
+			assertEquals(orig.getIndexInSubtaskGroup(), copy.getIndexInSubtaskGroup());
+			assertEquals(orig.getCurrentNumberOfSubtasks(), copy.getCurrentNumberOfSubtasks());
+			assertEquals(orig.getOutputGates(), copy.getOutputGates());
+			assertEquals(orig.getInputGates(), copy.getInputGates());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
 		}
 	}
 }