You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/06 11:49:02 UTC

[42/50] [abbrv] flink git commit: [FLINK-4505] [cluster mngt] Implement TaskManager component's startup

[FLINK-4505] [cluster mngt] Implement TaskManager component's startup

The TaskManagerRunner now contains the startup logic for the TaskManager's components.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdc3a0a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdc3a0a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdc3a0a7

Branch: refs/heads/flip-6
Commit: bdc3a0a7d55eb2be215b3ef8324a09c9c344c5fb
Parents: d5dff4b
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Authored: Fri Sep 2 18:00:49 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 6 13:38:45 2016 +0200

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutor.java      | 766 +------------------
 .../runtime/taskmanager/TaskManagerRunner.java  | 749 ++++++++++++++++++
 .../runtime/taskexecutor/TaskExecutorTest.java  |  53 +-
 3 files changed, 804 insertions(+), 764 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bdc3a0a7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 9d9ad2a..8ce2780 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -18,74 +18,29 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.io.network.ConnectionManager;
-import org.apache.flink.runtime.io.network.LocalConnectionManager;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.Preconditions;
 import org.jboss.netty.channel.ChannelException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.memory.HeapMemorySegment;
-import org.apache.flink.core.memory.HybridMemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-import org.apache.flink.runtime.taskmanager.MemoryLogger;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
-import org.apache.flink.util.MathUtils;
-import org.apache.flink.util.NetUtils;
-
-import scala.Tuple2;
-import scala.Option;
-import scala.Some;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -98,12 +53,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class);
 
-	/** The unique resource ID of this TaskExecutor */
-	private final ResourceID resourceID;
-
+	/** The connection information of this task manager */
 	private final TaskManagerLocation taskManagerLocation;
 
-	/** The access to the leader election and metadata storage services */
+	/** The access to the leader election and retrieval services */
 	private final HighAvailabilityServices haServices;
 
 	/** The task manager configuration */
@@ -128,28 +81,26 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	// ------------------------------------------------------------------------
 
 	public TaskExecutor(
-			TaskExecutorConfiguration taskExecutorConfig,
-			ResourceID resourceID,
-			TaskManagerLocation taskManagerLocation,
-			MemoryManager memoryManager,
-			IOManager ioManager,
-			NetworkEnvironment networkEnvironment,
-			RpcService rpcService,
-			HighAvailabilityServices haServices) {
+		TaskExecutorConfiguration taskExecutorConfig,
+		TaskManagerLocation taskManagerLocation,
+		RpcService rpcService,
+		MemoryManager memoryManager,
+		IOManager ioManager,
+		NetworkEnvironment networkEnvironment,
+		HighAvailabilityServices haServices) {
 
 		super(rpcService);
 
 		checkArgument(taskExecutorConfig.getNumberOfSlots() > 0, "The number of slots has to be larger than 0.");
 
 		this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
-		this.resourceID = checkNotNull(resourceID);
 		this.taskManagerLocation = checkNotNull(taskManagerLocation);
 		this.memoryManager = checkNotNull(memoryManager);
 		this.ioManager = checkNotNull(ioManager);
 		this.networkEnvironment = checkNotNull(networkEnvironment);
 		this.haServices = checkNotNull(haServices);
 
-		this.numberOfSlots = taskExecutorConfig.getNumberOfSlots();
+		this.numberOfSlots =  taskExecutorConfig.getNumberOfSlots();
 	}
 
 	// ------------------------------------------------------------------------
@@ -207,7 +158,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
-	/**
 	 * Requests a slot from the TaskManager
 	 *
 	 * @param allocationID id for the request
@@ -220,126 +170,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	}
 
 	/**
-	 * Starts and runs the TaskManager.
-	 * <p/>
-	 * This method first tries to select the network interface to use for the TaskManager
-	 * communication. The network interface is used both for the actor communication
-	 * (coordination) as well as for the data exchange between task managers. Unless
-	 * the hostname/interface is explicitly configured in the configuration, this
-	 * method will try out various interfaces and methods to connect to the JobManager
-	 * and select the one where the connection attempt is successful.
-	 * <p/>
-	 * After selecting the network interface, this method brings up an actor system
-	 * for the TaskManager and its actors, starts the TaskManager's services
-	 * (library cache, shuffle network stack, ...), and starts the TaskManager itself.
-	 *
-	 * @param configuration    The configuration for the TaskManager.
-	 * @param resourceID       The id of the resource which the task manager will run on.
-	 */
-	public static void selectNetworkInterfaceAndRunTaskManager(
-		Configuration configuration,
-		ResourceID resourceID) throws Exception {
-
-		final InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration);
-
-		runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration);
-	}
-
-	private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration)
-		throws Exception {
-		String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
-		if (taskManagerHostname != null) {
-			LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname);
-		} else {
-			LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
-			FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
-
-			InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, lookupTimeout);
-			taskManagerHostname = taskManagerAddress.getHostName();
-			LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
-				taskManagerHostname, taskManagerAddress.getHostAddress());
-		}
-
-		// if no task manager port has been configured, use 0 (system will pick any free port)
-		final int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
-		if (actorSystemPort < 0 || actorSystemPort > 65535) {
-			throw new IllegalConfigurationException("Invalid value for '" +
-				ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
-				"' (port for the TaskManager actor system) : " + actorSystemPort +
-				" - Leave config parameter empty or use 0 to let the system choose a port automatically.");
-		}
-
-		return new InetSocketAddress(taskManagerHostname, actorSystemPort);
-	}
-
-	/**
-	 * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its
-	 * actors, starts the TaskManager's services (library cache, shuffle network stack, ...),
-	 * and starts the TaskManager itself.
-	 * <p/>
-	 * This method will also spawn a process reaper for the TaskManager (kill the process if
-	 * the actor fails) and optionally start the JVM memory logging thread.
-	 *
-	 * @param taskManagerHostname The hostname/address of the interface where the actor system
-	 *                            will communicate.
-	 * @param resourceID          The id of the resource which the task manager will run on.
-	 * @param actorSystemPort   The port at which the actor system will communicate.
-	 * @param configuration       The configuration for the TaskManager.
-	 */
-	private static void runTaskManager(
-		String taskManagerHostname,
-		ResourceID resourceID,
-		int actorSystemPort,
-		final Configuration configuration) throws Exception {
-
-		LOG.info("Starting TaskManager");
-
-		// Bring up the TaskManager actor system first, bind it to the given address.
-
-		LOG.info("Starting TaskManager actor system at " +
-			NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort));
-
-		final ActorSystem taskManagerSystem;
-		try {
-			Tuple2<String, Object> address = new Tuple2<String, Object>(taskManagerHostname, actorSystemPort);
-			Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new Some<>(address));
-			LOG.debug("Using akka configuration\n " + akkaConfig);
-			taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig);
-		} catch (Throwable t) {
-			if (t instanceof ChannelException) {
-				Throwable cause = t.getCause();
-				if (cause != null && t.getCause() instanceof BindException) {
-					String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
-					throw new IOException("Unable to bind TaskManager actor system to address " +
-						address + " - " + cause.getMessage(), t);
-				}
-			}
-			throw new Exception("Could not create TaskManager actor system", t);
-		}
-
-		// start akka rpc service based on actor system
-		final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
-		final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout);
-
-		// start high availability service to implement getResourceManagerLeaderRetriever method only
-		final HighAvailabilityServices haServices = new HighAvailabilityServices() {
-			@Override
-			public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
-				return LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
-			}
-
-			@Override
 			public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
 				return null;
 			}
 
 			@Override
-			public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
-				return null;
-			}
-
-			@Override
-			public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
 				return null;
 			}
 
@@ -350,552 +185,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 			@Override
 			public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
-				return null;
-			}
-		};
-
-		// start all the TaskManager services (network stack,  library cache, ...)
-		// and the TaskManager actor
-		try {
-			LOG.info("Starting TaskManager actor");
-			TaskExecutor taskExecutor = startTaskManagerComponentsAndActor(
-				configuration,
-				resourceID,
-				akkaRpcService,
-				taskManagerHostname,
-				haServices,
-				false);
-
-			taskExecutor.start();
-
-			// if desired, start the logging daemon that periodically logs the memory usage information
-			if (LOG.isInfoEnabled() && configuration.getBoolean(
-				ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
-				ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) {
-				LOG.info("Starting periodic memory usage logger");
-
-				long interval = configuration.getLong(
-					ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
-					ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS);
-
-				MemoryLogger logger = new MemoryLogger(LOG, interval, taskManagerSystem);
-				logger.start();
-			}
-
-			// block until everything is done
-			taskManagerSystem.awaitTermination();
-		} catch (Throwable t) {
-			LOG.error("Error while starting up taskManager", t);
-			try {
-				taskManagerSystem.shutdown();
-			} catch (Throwable tt) {
-				LOG.warn("Could not cleanly shut down actor system", tt);
-			}
-			throw t;
-		}
-	}
-
-	// --------------------------------------------------------------------------
-	//  Starting and running the TaskManager
-	// --------------------------------------------------------------------------
-
-	/**
-	 * @param configuration                 The configuration for the TaskManager.
-	 * @param resourceID                    The id of the resource which the task manager will run on.
-	 * @param rpcService                  The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
-	 * @param taskManagerHostname       The hostname/address that describes the TaskManager's data location.
-	 * @param haServices        Optionally, a high availability service can be provided. If none is given,
-	 *                                      then a HighAvailabilityServices is constructed from the configuration.
-	 * @param localTaskManagerCommunication     If true, the TaskManager will not initiate the TCP network stack.
-	 * @return An ActorRef to the TaskManager actor.
-	 * @throws IllegalConfigurationException     Thrown, if the given config contains illegal values.
-	 * @throws IOException      Thrown, if any of the I/O components (such as buffer pools,
-	 *                                       I/O manager, ...) cannot be properly started.
-	 * @throws Exception      Thrown is some other error occurs while parsing the configuration
-	 *                                      or starting the TaskManager components.
-	 */
-	public static TaskExecutor startTaskManagerComponentsAndActor(
-		Configuration configuration,
-		ResourceID resourceID,
-		RpcService rpcService,
-		String taskManagerHostname,
-		HighAvailabilityServices haServices,
-		boolean localTaskManagerCommunication) throws Exception {
-
-		final TaskExecutorConfiguration taskExecutorConfig = parseTaskManagerConfiguration(
-			configuration, taskManagerHostname, localTaskManagerCommunication);
-
-		TaskManagerComponents taskManagerComponents = createTaskManagerComponents(
-			resourceID,
-			InetAddress.getByName(taskManagerHostname),
-			taskExecutorConfig,
-			configuration);
-
-		final TaskExecutor taskExecutor = new TaskExecutor(
-			taskExecutorConfig,
-			resourceID,
-			taskManagerComponents.getTaskManagerLocation(),
-			taskManagerComponents.getMemoryManager(),
-			taskManagerComponents.getIOManager(),
-			taskManagerComponents.getNetworkEnvironment(),
-			rpcService,
-			haServices);
-
-		return taskExecutor;
-	}
-
-	/**
-	 * Creates and returns the task manager components.
-	 *
-	 * @param resourceID resource ID of the task manager
-	 * @param taskManagerAddress address of the task manager
-	 * @param taskExecutorConfig task manager configuration
-	 * @param configuration of Flink
-	 * @return task manager components
-	 * @throws Exception
-	 */
-	private static TaskExecutor.TaskManagerComponents createTaskManagerComponents(
-		ResourceID resourceID,
-		InetAddress taskManagerAddress,
-		TaskExecutorConfiguration taskExecutorConfig,
-		Configuration configuration) throws Exception {
-		MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType();
-
-		// pre-start checks
-		checkTempDirs(taskExecutorConfig.getTmpDirPaths());
-
-		NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskExecutorConfig.getNetworkConfig();
-
-		NetworkBufferPool networkBufferPool = new NetworkBufferPool(
-			networkEnvironmentConfiguration.numNetworkBuffers(),
-			networkEnvironmentConfiguration.networkBufferSize(),
-			networkEnvironmentConfiguration.memoryType());
-
-		ConnectionManager connectionManager;
-
-		if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
-			connectionManager = new NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig().get());
-		} else {
-			connectionManager = new LocalConnectionManager();
-		}
-
-		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
-		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
-
-		KvStateRegistry kvStateRegistry = new KvStateRegistry();
-
-		KvStateServer kvStateServer;
-
-		if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
-			NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig().get();
-
-			int numNetworkThreads = networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ?
-				nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerNetworkThreads();
-
-			int numQueryThreads = networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ?
-				nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerQueryThreads();
-
-			kvStateServer = new KvStateServer(
-				taskManagerAddress,
-				networkEnvironmentConfiguration.queryServerPort(),
-				numNetworkThreads,
-				numQueryThreads,
-				kvStateRegistry,
-				new DisabledKvStateRequestStats());
-		} else {
-			kvStateServer = null;
-		}
-
-		// we start the network first, to make sure it can allocate its buffers first
-		final NetworkEnvironment network = new NetworkEnvironment(
-			networkBufferPool,
-			connectionManager,
-			resultPartitionManager,
-			taskEventDispatcher,
-			kvStateRegistry,
-			kvStateServer,
-			networkEnvironmentConfiguration.ioMode(),
-			networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
-			networkEnvironmentConfiguration.partitinRequestMaxBackoff());
-
-		network.start();
-
-		TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
-			resourceID,
-			taskManagerAddress,
-			network.getConnectionManager().getDataPort());
-
-		// computing the amount of memory to use depends on how much memory is available
-		// it strictly needs to happen AFTER the network stack has been initialized
-
-		// check if a value has been configured
-		long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
-		checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
-			ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
-			"MemoryManager needs at least one MB of memory. " +
-				"If you leave this config parameter empty, the system automatically " +
-				"pick a fraction of the available memory.");
-
-		final long memorySize;
-		boolean preAllocateMemory = configuration.getBoolean(
-			ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
-		if (configuredMemory > 0) {
-			if (preAllocateMemory) {
-				LOG.info("Using {} MB for managed memory." , configuredMemory);
-			} else {
-				LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
-			}
-			memorySize = configuredMemory << 20; // megabytes to bytes
-		} else {
-			float fraction = configuration.getFloat(
-				ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-				ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
-			checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction,
-				ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-				"MemoryManager fraction of the free memory must be between 0.0 and 1.0");
-
-			if (memType == MemoryType.HEAP) {
-				long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
-				if (preAllocateMemory) {
-					LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
-						fraction , relativeMemSize >> 20);
-				} else {
-					LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +
-						"memory will be allocated lazily." , fraction , relativeMemSize >> 20);
-				}
-				memorySize = relativeMemSize;
-			} else if (memType == MemoryType.OFF_HEAP) {
-				// The maximum heap memory has been adjusted according to the fraction
-				long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
-				long directMemorySize = (long) (maxMemory / (1.0 - fraction) * fraction);
-				if (preAllocateMemory) {
-					LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
-						fraction, directMemorySize >> 20);
-				} else {
-					LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
-						" memory will be allocated lazily.", fraction, directMemorySize >> 20);
-				}
-				memorySize = directMemorySize;
-			} else {
-				throw new RuntimeException("No supported memory type detected.");
-			}
-		}
-
-		// now start the memory manager
-		final MemoryManager memoryManager;
-		try {
-			memoryManager = new MemoryManager(
-				memorySize,
-				taskExecutorConfig.getNumberOfSlots(),
-				taskExecutorConfig.getNetworkConfig().networkBufferSize(),
-				memType,
-				preAllocateMemory);
-		} catch (OutOfMemoryError e) {
-			if (memType == MemoryType.HEAP) {
-				throw new Exception("OutOfMemory error (" + e.getMessage() +
-					") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
-			} else if (memType == MemoryType.OFF_HEAP) {
-				throw new Exception("OutOfMemory error (" + e.getMessage() +
-					") while allocating the TaskManager off-heap memory (" + memorySize +
-					" bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
-			} else {
-				throw e;
-			}
-		}
-
-		// start the I/O manager, it will create some temp directories.
-		final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths());
-
-		return new TaskExecutor.TaskManagerComponents(taskManagerLocation, memoryManager, ioManager, network);
-	}
-
-	// --------------------------------------------------------------------------
-	//  Parsing and checking the TaskManager Configuration
-	// --------------------------------------------------------------------------
-
-	/**
-	 * Utility method to extract TaskManager config parameters from the configuration and to
-	 * sanity check them.
-	 *
-	 * @param configuration                 The configuration.
-	 * @param taskManagerHostname           The host name under which the TaskManager communicates.
-	 * @param localTaskManagerCommunication             True, to skip initializing the network stack.
-	 *                                      Use only in cases where only one task manager runs.
-	 * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
-	 */
-	private static TaskExecutorConfiguration parseTaskManagerConfiguration(
-		Configuration configuration,
-		String taskManagerHostname,
-		boolean localTaskManagerCommunication) throws Exception {
-
-		// ------- read values from the config and check them ---------
-		//                      (a lot of them)
-
-		// ----> hosts / ports for communication and data exchange
-
-		int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
-		if (dataport == 0) {
-			dataport = NetUtils.getAvailablePort();
-		}
-		checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-			"Leave config parameter empty or use 0 to let the system choose a port automatically.");
-
-		InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname);
-		final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
-
-		// ----> memory / network stack (shuffles/broadcasts), task slots, temp directories
-
-		// we need this because many configs have been written with a "-1" entry
-		int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-		if (slots == -1) {
-			slots = 1;
-		}
-		checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
-			"Number of task slots must be at least one.");
-
-		final int numNetworkBuffers = configuration.getInteger(
-			ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
-		checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
-			ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, "");
-
-		final int pageSize = configuration.getInteger(
-			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
-		// check page size of for minimum size
-		checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
-			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-			"Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
-		// check page size for power of two
-		checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
-			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-			"Memory segment size must be a power of 2.");
-
-		// check whether we use heap or off-heap memory
-		final MemoryType memType;
-		if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) {
-			memType = MemoryType.OFF_HEAP;
-		} else {
-			memType = MemoryType.HEAP;
-		}
-
-		// initialize the memory segment factory accordingly
-		if (memType == MemoryType.HEAP) {
-			if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
-				throw new Exception("Memory type is set to heap memory, but memory segment " +
-					"factory has been initialized for off-heap memory segments");
-			}
-		} else {
-			if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) {
-				throw new Exception("Memory type is set to off-heap memory, but memory segment " +
-					"factory has been initialized for heap memory segments");
-			}
-		}
-
-		final String[] tmpDirs = configuration.getString(
-			ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
-
-		final NettyConfig nettyConfig;
-		if (!localTaskManagerCommunication) {
-			nettyConfig = new NettyConfig(
-				taskManagerInetSocketAddress.getAddress(),
-				taskManagerInetSocketAddress.getPort(),
-				pageSize,
-				slots,
-				configuration);
-		} else {
-			nettyConfig = null;
-		}
-
-		// Default spill I/O mode for intermediate results
-		final String syncOrAsync = configuration.getString(
-			ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
-			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
-
-		final IOMode ioMode;
-		if (syncOrAsync.equals("async")) {
-			ioMode = IOManager.IOMode.ASYNC;
-		} else {
-			ioMode = IOManager.IOMode.SYNC;
-		}
-
-		final int queryServerPort =  configuration.getInteger(
-			ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
-			ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT);
-
-		final int queryServerNetworkThreads =  configuration.getInteger(
-			ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
-			ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS);
-
-		final int queryServerQueryThreads =  configuration.getInteger(
-			ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
-			ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS);
-
-		final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration(
-			numNetworkBuffers,
-			pageSize,
-			memType,
-			ioMode,
-			queryServerPort,
-			queryServerNetworkThreads,
-			queryServerQueryThreads,
-			Option.apply(nettyConfig),
-			500,
-			30000);
-
-		// ----> timeouts, library caching, profiling
-
-		final FiniteDuration timeout;
-		try {
-			timeout = AkkaUtils.getTimeout(configuration);
-		} catch (Exception e) {
-			throw new IllegalArgumentException(
-				"Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT +
-					"'.Use formats like '50 s' or '1 min' to specify the timeout.");
-		}
-		LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout);
-
-		final long cleanupInterval = configuration.getLong(
-			ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
-			ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
-
-		final FiniteDuration finiteRegistrationDuration;
-		try {
-			Duration maxRegistrationDuration = Duration.create(configuration.getString(
-				ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
-				ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
-			if (maxRegistrationDuration.isFinite()) {
-				finiteRegistrationDuration = new FiniteDuration(maxRegistrationDuration.toSeconds(), TimeUnit.SECONDS);
-			} else {
-				finiteRegistrationDuration = null;
-			}
-		} catch (NumberFormatException e) {
-			throw new IllegalArgumentException("Invalid format for parameter " +
-				ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e);
-		}
-
-		final FiniteDuration initialRegistrationPause;
-		try {
-			Duration pause = Duration.create(configuration.getString(
-				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
-				ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
-			if (pause.isFinite()) {
-				initialRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
-			} else {
-				throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);
-			}
-		} catch (NumberFormatException e) {
-			throw new IllegalArgumentException("Invalid format for parameter " +
-				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
-		}
-
-		final FiniteDuration maxRegistrationPause;
-		try {
-			Duration pause = Duration.create(configuration.getString(
-				ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
-				ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
-			if (pause.isFinite()) {
-				maxRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
-			} else {
-				throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);
-			}
-		} catch (NumberFormatException e) {
-			throw new IllegalArgumentException("Invalid format for parameter " +
-				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
-		}
-
-		final FiniteDuration refusedRegistrationPause;
-		try {
-			Duration pause = Duration.create(configuration.getString(
-				ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
-				ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
-			if (pause.isFinite()) {
-				refusedRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
-			} else {
-				throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);
-			}
-		} catch (NumberFormatException e) {
-			throw new IllegalArgumentException("Invalid format for parameter " +
-				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
-		}
-
-		return new TaskExecutorConfiguration(
-			tmpDirs,
-			cleanupInterval,
-			networkConfig,
-			timeout,
-			finiteRegistrationDuration,
-			slots,
-			configuration,
-			initialRegistrationPause,
-			maxRegistrationPause,
-			refusedRegistrationPause);
-	}
-
-	/**
-	 * Validates a condition for a config parameter and displays a standard exception, if the
-	 * the condition does not hold.
-	 *
-	 * @param condition    The condition that must hold. If the condition is false, an exception is thrown.
-	 * @param parameter    The parameter value. Will be shown in the exception message.
-	 * @param name         The name of the config parameter. Will be shown in the exception message.
-	 * @param errorMessage The optional custom error message to append to the exception message.
-	 */
-	private static void checkConfigParameter(
-		boolean condition,
-		Object parameter,
-		String name,
-		String errorMessage) {
-		if (!condition) {
-			throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage);
-		}
-	}
-
-	/**
-	 * Validates that all the directories denoted by the strings do actually exist, are proper
-	 * directories (not files), and are writable.
-	 *
-	 * @param tmpDirs The array of directory paths to check.
-	 * @throws Exception Thrown if any of the directories does not exist or is not writable
-	 *                   or is a file, rather than a directory.
-	 */
-	private static void checkTempDirs(String[] tmpDirs) throws IOException {
-		for (String dir : tmpDirs) {
-			if (dir != null && !dir.equals("")) {
-				File file = new File(dir);
-				if (!file.exists()) {
-					throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist.");
-				}
-				if (!file.isDirectory()) {
-					throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory.");
-				}
-				if (!file.canWrite()) {
-					throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable.");
-				}
-
-				if (LOG.isInfoEnabled()) {
-					long totalSpaceGb = file.getTotalSpace() >> 30;
-					long usableSpaceGb = file.getUsableSpace() >> 30;
-					double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100;
-					String path = file.getAbsolutePath();
-					LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)",
-						path, totalSpaceGb, usableSpaceGb, usablePercentage));
-				}
-			} else {
-				throw new IllegalArgumentException("Temporary file directory #$id is null.");
-			}
-		}
-	}
-
 	// ------------------------------------------------------------------------
 	//  Properties
 	// ------------------------------------------------------------------------
 
 	public ResourceID getResourceID() {
-		return resourceID;
+		return taskManagerLocation.getResourceID();
 	}
 
 	// ------------------------------------------------------------------------
@@ -959,37 +254,4 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
-	private static class TaskManagerComponents {
-		private final TaskManagerLocation taskManagerLocation;
-		private final MemoryManager memoryManager;
-		private final IOManager ioManager;
-		private final NetworkEnvironment networkEnvironment;
-
-		private TaskManagerComponents(
-				TaskManagerLocation taskManagerLocation,
-				MemoryManager memoryManager,
-				IOManager ioManager,
-				NetworkEnvironment networkEnvironment) {
-			this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
-			this.memoryManager = Preconditions.checkNotNull(memoryManager);
-			this.ioManager = Preconditions.checkNotNull(ioManager);
-			this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
-		}
-
-		public MemoryManager getMemoryManager() {
-			return memoryManager;
-		}
-
-		public IOManager getIOManager() {
-			return ioManager;
-		}
-
-		public NetworkEnvironment getNetworkEnvironment() {
-			return networkEnvironment;
-		}
-
-		public TaskManagerLocation getTaskManagerLocation() {
-			return taskManagerLocation;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdc3a0a7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java
new file mode 100644
index 0000000..4f756fb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java
@@ -0,0 +1,749 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.HybridMemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.NetUtils;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import com.typesafe.config.Config;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is the executable entry point for the task manager in yarn or standalone mode.
+ * It constructs the related components (network, I/O manager, memory manager, RPC service, HA service)
+ * and starts them.
+ */
+public class TaskManagerRunner {
+
+	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class);
+
+	/**
+	 * Constructs related components of the TaskManager and starts them.
+	 *
+	 * @param configuration                 The configuration for the TaskManager.
+	 * @param resourceID                    The id of the resource which the task manager will run on.
+	 * @param rpcService                    Optionally, The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
+	 *                                                 If none is given, then a RpcService is constructed from the configuration.
+	 * @param taskManagerHostname   Optionally, The hostname/address that describes the TaskManager's data location.
+	 *                                                 If none is given, it can be got from the configuration.
+	 * @param localTaskManagerCommunication      If true, the TaskManager will not initiate the TCP network stack.
+	 * @param haServices                    Optionally, a high availability service can be provided. If none is given,
+	 *                                                 then a HighAvailabilityServices is constructed from the configuration.
+	 */
+	public static void createAndStartComponents(
+		final Configuration configuration,
+		final ResourceID resourceID,
+		RpcService rpcService,
+		String taskManagerHostname,
+		boolean localTaskManagerCommunication,
+		HighAvailabilityServices haServices) throws Exception {
+
+		checkNotNull(configuration);
+		checkNotNull(resourceID);
+
+		if (taskManagerHostname == null || taskManagerHostname.isEmpty()) {
+			taskManagerHostname = selectNetworkInterface(configuration);
+		}
+
+		if (rpcService == null) {
+			// if no task manager port has been configured, use 0 (system will pick any free port)
+			final int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+			if (actorSystemPort < 0 || actorSystemPort > 65535) {
+				throw new IllegalConfigurationException("Invalid value for '" +
+					ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
+					"' (port for the TaskManager actor system) : " + actorSystemPort +
+					" - Leave config parameter empty or use 0 to let the system choose a port automatically.");
+			}
+			rpcService = createRpcService(configuration, taskManagerHostname, actorSystemPort);
+		}
+
+		if(haServices == null) {
+			// start high availability service to implement getResourceManagerLeaderRetriever method only
+			haServices = new HighAvailabilityServices() {
+				@Override
+				public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+					return LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+				}
+
+				@Override
+				public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+					return null;
+				}
+
+				@Override
+				public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
+					return null;
+				}
+			};
+		}
+
+		createAndStartTaskManagerComponents(
+			configuration,
+			resourceID,
+			rpcService,
+			taskManagerHostname,
+			haServices,
+			localTaskManagerCommunication);
+	}
+
+	/**
+	 * <p/>
+	 * This method tries to select the network interface to use for the TaskManager
+	 * communication. The network interface is used both for the actor communication
+	 * (coordination) as well as for the data exchange between task managers. Unless
+	 * the hostname/interface is explicitly configured in the configuration, this
+	 * method will try out various interfaces and methods to connect to the JobManager
+	 * and select the one where the connection attempt is successful.
+	 * <p/>
+	 *
+	 * @param configuration    The configuration for the TaskManager.
+	 * @return  The host name under which the TaskManager communicates.
+	 */
+	private static String selectNetworkInterface(Configuration configuration) throws Exception {
+		String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+		if (taskManagerHostname != null) {
+			LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname);
+		} else {
+			LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+			FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
+
+			InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, lookupTimeout);
+			taskManagerHostname = taskManagerAddress.getHostName();
+			LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
+				taskManagerHostname, taskManagerAddress.getHostAddress());
+		}
+
+		return taskManagerHostname;
+	}
+
+	/**
+	 * Utility method to create RPC service from configuration and hostname, port.
+	 *
+	 * @param configuration                 The configuration for the TaskManager.
+	 * @param taskManagerHostname   The hostname/address that describes the TaskManager's data location.
+	 * @param actorSystemPort           If true, the TaskManager will not initiate the TCP network stack.
+	 * @return   The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
+	 * @throws java.io.IOException      Thrown, if the actor system can not bind to the address
+	 * @throws java.lang.Exception      Thrown is some other error occurs while creating akka actor system
+	 */
+	private static RpcService createRpcService(Configuration configuration, String taskManagerHostname, int actorSystemPort)
+		throws Exception{
+
+		// Bring up the TaskManager actor system first, bind it to the given address.
+
+		LOG.info("Starting TaskManager actor system at " +
+			NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort));
+
+		final ActorSystem taskManagerSystem;
+		try {
+			Tuple2<String, Object> address = new Tuple2<String, Object>(taskManagerHostname, actorSystemPort);
+			Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new Some<>(address));
+			LOG.debug("Using akka configuration\n " + akkaConfig);
+			taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig);
+		} catch (Throwable t) {
+			if (t instanceof org.jboss.netty.channel.ChannelException) {
+				Throwable cause = t.getCause();
+				if (cause != null && t.getCause() instanceof java.net.BindException) {
+					String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
+					throw new IOException("Unable to bind TaskManager actor system to address " +
+						address + " - " + cause.getMessage(), t);
+				}
+			}
+			throw new Exception("Could not create TaskManager actor system", t);
+		}
+
+		// start akka rpc service based on actor system
+		final Timeout timeout = new Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS);
+		final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout);
+
+		return akkaRpcService;
+	}
+
+	/**
+	 * @param configuration                 The configuration for the TaskManager.
+	 * @param resourceID                    The id of the resource which the task manager will run on.
+	 * @param rpcService                    The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
+	 * @param taskManagerHostname   The hostname/address that describes the TaskManager's data location.
+	 * @param haServices                    Optionally, a high availability service can be provided. If none is given,
+	 *                                                  then a HighAvailabilityServices is constructed from the configuration.
+	 * @param localTaskManagerCommunication     If true, the TaskManager will not initiate the TCP network stack.
+	 * @throws IllegalConfigurationException        Thrown, if the given config contains illegal values.
+	 * @throws IOException      Thrown, if any of the I/O components (such as buffer pools, I/O manager, ...)
+	 *                                              cannot be properly started.
+	 * @throws Exception      Thrown is some other error occurs while parsing the configuration or
+	 *                                              starting the TaskManager components.
+	 */
+	private static void createAndStartTaskManagerComponents(
+		Configuration configuration,
+		ResourceID resourceID,
+		RpcService rpcService,
+		String taskManagerHostname,
+		HighAvailabilityServices haServices,
+		boolean localTaskManagerCommunication) throws Exception {
+
+		final TaskExecutorConfiguration taskManagerConfig = parseTaskManagerConfiguration(
+			configuration, taskManagerHostname, localTaskManagerCommunication);
+
+		TaskManagerComponents taskManagerComponents = createTaskManagerComponents(
+			resourceID,
+			InetAddress.getByName(taskManagerHostname),
+			taskManagerConfig,
+			configuration);
+
+		final TaskExecutor taskExecutor = new TaskExecutor(
+			taskManagerConfig,
+			taskManagerComponents.getTaskManagerLocation(),
+			rpcService, taskManagerComponents.getMemoryManager(),
+			taskManagerComponents.getIOManager(),
+			taskManagerComponents.getNetworkEnvironment(),
+			haServices);
+
+		taskExecutor.start();
+	}
+
+	/**
+	 * Creates and returns the task manager components.
+	 *
+	 * @param resourceID resource ID of the task manager
+	 * @param taskManagerAddress address of the task manager
+	 * @param taskExecutorConfig task manager configuration
+	 * @param configuration of Flink
+	 * @return task manager components
+	 * @throws Exception
+	 */
+	private static TaskManagerComponents createTaskManagerComponents(
+		ResourceID resourceID,
+		InetAddress taskManagerAddress,
+		TaskExecutorConfiguration taskExecutorConfig,
+		Configuration configuration) throws Exception {
+
+		MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType();
+
+		// pre-start checks
+		checkTempDirs(taskExecutorConfig.getTmpDirPaths());
+
+		NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskExecutorConfig.getNetworkConfig();
+
+		NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+			networkEnvironmentConfiguration.numNetworkBuffers(),
+			networkEnvironmentConfiguration.networkBufferSize(),
+			networkEnvironmentConfiguration.memoryType());
+
+		ConnectionManager connectionManager;
+
+		if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
+			connectionManager = new NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig().get());
+		} else {
+			connectionManager = new LocalConnectionManager();
+		}
+
+		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
+		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
+
+		KvStateRegistry kvStateRegistry = new KvStateRegistry();
+
+		KvStateServer kvStateServer;
+
+		if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
+			NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig().get();
+
+			int numNetworkThreads = networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ?
+				nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerNetworkThreads();
+
+			int numQueryThreads = networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ?
+				nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerQueryThreads();
+
+			kvStateServer = new KvStateServer(
+				taskManagerAddress,
+				networkEnvironmentConfiguration.queryServerPort(),
+				numNetworkThreads,
+				numQueryThreads,
+				kvStateRegistry,
+				new DisabledKvStateRequestStats());
+		} else {
+			kvStateServer = null;
+		}
+
+		// we start the network first, to make sure it can allocate its buffers first
+		final NetworkEnvironment network = new NetworkEnvironment(
+			networkBufferPool,
+			connectionManager,
+			resultPartitionManager,
+			taskEventDispatcher,
+			kvStateRegistry,
+			kvStateServer,
+			networkEnvironmentConfiguration.ioMode(),
+			networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
+			networkEnvironmentConfiguration.partitinRequestMaxBackoff());
+
+		network.start();
+
+		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
+			resourceID,
+			taskManagerAddress,
+			network.getConnectionManager().getDataPort());
+
+		// computing the amount of memory to use depends on how much memory is available
+		// it strictly needs to happen AFTER the network stack has been initialized
+
+		// check if a value has been configured
+		long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+		checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
+			ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+			"MemoryManager needs at least one MB of memory. " +
+				"If you leave this config parameter empty, the system automatically " +
+				"pick a fraction of the available memory.");
+
+		final long memorySize;
+		boolean preAllocateMemory = configuration.getBoolean(
+			ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
+			ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
+		if (configuredMemory > 0) {
+			if (preAllocateMemory) {
+				LOG.info("Using {} MB for managed memory." , configuredMemory);
+			} else {
+				LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
+			}
+			memorySize = configuredMemory << 20; // megabytes to bytes
+		} else {
+			float fraction = configuration.getFloat(
+				ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+				ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
+			checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction,
+				ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+				"MemoryManager fraction of the free memory must be between 0.0 and 1.0");
+
+			if (memType == MemoryType.HEAP) {
+				long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
+				if (preAllocateMemory) {
+					LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
+						fraction , relativeMemSize >> 20);
+				} else {
+					LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +
+						"memory will be allocated lazily." , fraction , relativeMemSize >> 20);
+				}
+				memorySize = relativeMemSize;
+			} else if (memType == MemoryType.OFF_HEAP) {
+				// The maximum heap memory has been adjusted according to the fraction
+				long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
+				long directMemorySize = (long) (maxMemory / (1.0 - fraction) * fraction);
+				if (preAllocateMemory) {
+					LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
+						fraction, directMemorySize >> 20);
+				} else {
+					LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
+						" memory will be allocated lazily.", fraction, directMemorySize >> 20);
+				}
+				memorySize = directMemorySize;
+			} else {
+				throw new RuntimeException("No supported memory type detected.");
+			}
+		}
+
+		// now start the memory manager
+		final MemoryManager memoryManager;
+		try {
+			memoryManager = new MemoryManager(
+				memorySize,
+				taskExecutorConfig.getNumberOfSlots(),
+				taskExecutorConfig.getNetworkConfig().networkBufferSize(),
+				memType,
+				preAllocateMemory);
+		} catch (OutOfMemoryError e) {
+			if (memType == MemoryType.HEAP) {
+				throw new Exception("OutOfMemory error (" + e.getMessage() +
+					") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
+			} else if (memType == MemoryType.OFF_HEAP) {
+				throw new Exception("OutOfMemory error (" + e.getMessage() +
+					") while allocating the TaskManager off-heap memory (" + memorySize +
+					" bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
+			} else {
+				throw e;
+			}
+		}
+
+		// start the I/O manager, it will create some temp directories.
+		final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths());
+
+		return new TaskManagerComponents(taskManagerLocation, memoryManager, ioManager, network);
+	}
+
+	// --------------------------------------------------------------------------
+	//  Parsing and checking the TaskManager Configuration
+	// --------------------------------------------------------------------------
+
+	/**
+	 * Utility method to extract TaskManager config parameters from the configuration and to
+	 * sanity check them.
+	 *
+	 * @param configuration                         The configuration.
+	 * @param taskManagerHostname           The host name under which the TaskManager communicates.
+	 * @param localTaskManagerCommunication             True, to skip initializing the network stack.
+	 *                                      Use only in cases where only one task manager runs.
+	 * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
+	 */
+	private static TaskExecutorConfiguration parseTaskManagerConfiguration(
+		Configuration configuration,
+		String taskManagerHostname,
+		boolean localTaskManagerCommunication) throws Exception {
+
+		// ------- read values from the config and check them ---------
+		//                      (a lot of them)
+
+		// ----> hosts / ports for communication and data exchange
+
+		int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+			ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
+
+		checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+			"Leave config parameter empty or use 0 to let the system choose a port automatically.");
+
+		InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname);
+		final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
+
+		// ----> memory / network stack (shuffles/broadcasts), task slots, temp directories
+
+		// we need this because many configs have been written with a "-1" entry
+		int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+		if (slots == -1) {
+			slots = 1;
+		}
+
+		checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+			"Number of task slots must be at least one.");
+
+		final int numNetworkBuffers = configuration.getInteger(
+			ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+
+		checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
+			ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, "");
+
+		final int pageSize = configuration.getInteger(
+			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+			ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
+
+		// check page size of for minimum size
+		checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
+			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+			"Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
+
+		// check page size for power of two
+		checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
+			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+			"Memory segment size must be a power of 2.");
+
+		// check whether we use heap or off-heap memory
+		final MemoryType memType;
+		if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) {
+			memType = MemoryType.OFF_HEAP;
+		} else {
+			memType = MemoryType.HEAP;
+		}
+
+		// initialize the memory segment factory accordingly
+		if (memType == MemoryType.HEAP) {
+			if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
+				throw new Exception("Memory type is set to heap memory, but memory segment " +
+					"factory has been initialized for off-heap memory segments");
+			}
+		} else {
+			if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) {
+				throw new Exception("Memory type is set to off-heap memory, but memory segment " +
+					"factory has been initialized for heap memory segments");
+			}
+		}
+
+		final String[] tmpDirs = configuration.getString(
+			ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+			ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
+
+		final NettyConfig nettyConfig;
+		if (!localTaskManagerCommunication) {
+			nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(),
+				taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration);
+		} else {
+			nettyConfig = null;
+		}
+
+		// Default spill I/O mode for intermediate results
+		final String syncOrAsync = configuration.getString(
+			ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
+			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
+
+		final IOManager.IOMode ioMode;
+		if (syncOrAsync.equals("async")) {
+			ioMode = IOManager.IOMode.ASYNC;
+		} else {
+			ioMode = IOManager.IOMode.SYNC;
+		}
+
+		final int queryServerPort =  configuration.getInteger(
+			ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
+			ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT);
+
+		final int queryServerNetworkThreads =  configuration.getInteger(
+			ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
+			ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS);
+
+		final int queryServerQueryThreads =  configuration.getInteger(
+			ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
+			ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS);
+
+		final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration(
+			numNetworkBuffers,
+			pageSize,
+			memType,
+			ioMode,
+			queryServerPort,
+			queryServerNetworkThreads,
+			queryServerQueryThreads,
+			Option.apply(nettyConfig),
+			500,
+			3000);
+
+		// ----> timeouts, library caching, profiling
+
+		final FiniteDuration timeout;
+		try {
+			timeout = AkkaUtils.getTimeout(configuration);
+		} catch (Exception e) {
+			throw new IllegalArgumentException(
+				"Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT +
+					"'.Use formats like '50 s' or '1 min' to specify the timeout.");
+		}
+		LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout);
+
+		final long cleanupInterval = configuration.getLong(
+			ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+			ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+
+		final FiniteDuration finiteRegistrationDuration;
+		try {
+			Duration maxRegistrationDuration = Duration.create(configuration.getString(
+				ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
+				ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
+			if (maxRegistrationDuration.isFinite()) {
+				finiteRegistrationDuration = new FiniteDuration(maxRegistrationDuration.toSeconds(), TimeUnit.SECONDS);
+			} else {
+				finiteRegistrationDuration = null;
+			}
+		} catch (NumberFormatException e) {
+			throw new IllegalArgumentException("Invalid format for parameter " +
+				ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e);
+		}
+
+		final FiniteDuration initialRegistrationPause;
+		try {
+			Duration pause = Duration.create(configuration.getString(
+				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+				ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
+			if (pause.isFinite()) {
+				initialRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
+			} else {
+				throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);
+			}
+		} catch (NumberFormatException e) {
+			throw new IllegalArgumentException("Invalid format for parameter " +
+				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+		}
+
+		final FiniteDuration maxRegistrationPause;
+		try {
+			Duration pause = Duration.create(configuration.getString(
+				ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
+				ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
+			if (pause.isFinite()) {
+				maxRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
+			} else {
+				throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);
+			}
+		} catch (NumberFormatException e) {
+			throw new IllegalArgumentException("Invalid format for parameter " +
+				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+		}
+
+		final FiniteDuration refusedRegistrationPause;
+		try {
+			Duration pause = Duration.create(configuration.getString(
+				ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
+				ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
+			if (pause.isFinite()) {
+				refusedRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
+			} else {
+				throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);
+			}
+		} catch (NumberFormatException e) {
+			throw new IllegalArgumentException("Invalid format for parameter " +
+				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+		}
+
+		return new TaskExecutorConfiguration(
+			tmpDirs,
+			cleanupInterval,
+			networkConfig,
+			timeout,
+			finiteRegistrationDuration,
+			slots,
+			configuration,
+			initialRegistrationPause,
+			maxRegistrationPause,
+			refusedRegistrationPause);
+	}
+
+	/**
+	 * Validates a condition for a config parameter and displays a standard exception, if the
+	 * the condition does not hold.
+	 *
+	 * @param condition             The condition that must hold. If the condition is false, an exception is thrown.
+	 * @param parameter         The parameter value. Will be shown in the exception message.
+	 * @param name              The name of the config parameter. Will be shown in the exception message.
+	 * @param errorMessage  The optional custom error message to append to the exception message.
+	 */
+	private static void checkConfigParameter(
+		boolean condition,
+		Object parameter,
+		String name,
+		String errorMessage) {
+		if (!condition) {
+			throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage);
+		}
+	}
+
+	/**
+	 * Validates that all the directories denoted by the strings do actually exist, are proper
+	 * directories (not files), and are writable.
+	 *
+	 * @param tmpDirs       The array of directory paths to check.
+	 * @throws Exception    Thrown if any of the directories does not exist or is not writable
+	 *                   or is a file, rather than a directory.
+	 */
+	private static void checkTempDirs(String[] tmpDirs) throws IOException {
+		for (String dir : tmpDirs) {
+			if (dir != null && !dir.equals("")) {
+				File file = new File(dir);
+				if (!file.exists()) {
+					throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist.");
+				}
+				if (!file.isDirectory()) {
+					throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory.");
+				}
+				if (!file.canWrite()) {
+					throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable.");
+				}
+
+				if (LOG.isInfoEnabled()) {
+					long totalSpaceGb = file.getTotalSpace() >> 30;
+					long usableSpaceGb = file.getUsableSpace() >> 30;
+					double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100;
+					String path = file.getAbsolutePath();
+					LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)",
+						path, totalSpaceGb, usableSpaceGb, usablePercentage));
+				}
+			} else {
+				throw new IllegalArgumentException("Temporary file directory #$id is null.");
+			}
+		}
+	}
+
+	private static class TaskManagerComponents {
+		private final TaskManagerLocation taskManagerLocation;
+		private final MemoryManager memoryManager;
+		private final IOManager ioManager;
+		private final NetworkEnvironment networkEnvironment;
+
+		private TaskManagerComponents(
+			TaskManagerLocation taskManagerLocation,
+			MemoryManager memoryManager,
+			IOManager ioManager,
+			NetworkEnvironment networkEnvironment) {
+
+			this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
+			this.memoryManager = Preconditions.checkNotNull(memoryManager);
+			this.ioManager = Preconditions.checkNotNull(ioManager);
+			this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
+		}
+
+		public MemoryManager getMemoryManager() {
+			return memoryManager;
+		}
+
+		public IOManager getIOManager() {
+			return ioManager;
+		}
+
+		public NetworkEnvironment getNetworkEnvironment() {
+			return networkEnvironment;
+		}
+
+		public TaskManagerLocation getTaskManagerLocation() {
+			return taskManagerLocation;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdc3a0a7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 09aab18..26218dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -19,17 +19,22 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.NonHaServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import org.powermock.api.mockito.PowerMockito;
 import java.util.UUID;
 
 import static org.junit.Assert.*;
@@ -42,19 +47,31 @@ public class TaskExecutorTest extends TestLogger {
 		final ResourceID resourceID = ResourceID.generate();
 		final String resourceManagerAddress = "/resource/manager/address/one";
 
-		final TestingRpcService rpc = new TestingRpcService();
+		final TestingSerialRpcService rpc = new TestingSerialRpcService();
 		try {
 			// register a mock resource manager gateway
 			ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
+			TaskExecutorConfiguration taskExecutorConfiguration = mock(TaskExecutorConfiguration.class);
+			PowerMockito.when(taskExecutorConfiguration.getNumberOfSlots()).thenReturn(1);
 			rpc.registerGateway(resourceManagerAddress, rmGateway);
 
+			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
+			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+
 			NonHaServices haServices = new NonHaServices(resourceManagerAddress);
-			TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor(
-				new Configuration(), resourceID, rpc, "localhost", haServices, true);
-			String taskManagerAddress = taskManager.getAddress();
+
+			TaskExecutor taskManager = new TaskExecutor(
+				taskExecutorConfiguration,
+				taskManagerLocation,
+				rpc, mock(MemoryManager.class),
+				mock(IOManager.class),
+				mock(NetworkEnvironment.class),
+				haServices);
+
 			taskManager.start();
+			String taskManagerAddress = taskManager.getAddress();
 
-			verify(rmGateway, timeout(5000)).registerTaskExecutor(
+			verify(rmGateway).registerTaskExecutor(
 					any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(Time.class));
 		}
 		finally {
@@ -71,7 +88,7 @@ public class TaskExecutorTest extends TestLogger {
 		final UUID leaderId1 = UUID.randomUUID();
 		final UUID leaderId2 = UUID.randomUUID();
 
-		final TestingRpcService rpc = new TestingRpcService();
+		final TestingSerialRpcService rpc = new TestingSerialRpcService();
 		try {
 			// register the mock resource manager gateways
 			ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
@@ -84,10 +101,22 @@ public class TaskExecutorTest extends TestLogger {
 			TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
 			haServices.setResourceManagerLeaderRetriever(testLeaderService);
 
-			TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor(
-				new Configuration(), resourceID, rpc, "localhost", haServices, true);
-			String taskManagerAddress = taskManager.getAddress();
+			TaskExecutorConfiguration taskExecutorConfiguration = mock(TaskExecutorConfiguration.class);
+			PowerMockito.when(taskExecutorConfiguration.getNumberOfSlots()).thenReturn(1);
+
+			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
+			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+
+			TaskExecutor taskManager = new TaskExecutor(
+				taskExecutorConfiguration,
+				taskManagerLocation,
+				rpc, mock(MemoryManager.class),
+				mock(IOManager.class),
+				mock(NetworkEnvironment.class),
+				haServices);
+
 			taskManager.start();
+			String taskManagerAddress = taskManager.getAddress();
 
 			// no connection initially, since there is no leader
 			assertNull(taskManager.getResourceManagerConnection());
@@ -95,7 +124,7 @@ public class TaskExecutorTest extends TestLogger {
 			// define a leader and see that a registration happens
 			testLeaderService.notifyListener(address1, leaderId1);
 
-			verify(rmGateway1, timeout(5000)).registerTaskExecutor(
+			verify(rmGateway1).registerTaskExecutor(
 					eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
 
@@ -105,7 +134,7 @@ public class TaskExecutorTest extends TestLogger {
 			// set a new leader, see that a registration happens 
 			testLeaderService.notifyListener(address2, leaderId2);
 
-			verify(rmGateway2, timeout(5000)).registerTaskExecutor(
+			verify(rmGateway2).registerTaskExecutor(
 					eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
 		}