You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/06 11:49:05 UTC

[45/50] [abbrv] flink git commit: [FLINK-4505] [cluster mngt] Separate TaskManager service configuration from TaskManagerConfiguration; Implement TaskManagerRunner

[FLINK-4505] [cluster mngt] Separate TaskManager service configuration from TaskManagerConfiguration; Implement TaskManagerRunner

Refactors the startup logic so that is easier to reuse.

This closes #2461.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1e45aec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1e45aec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1e45aec

Branch: refs/heads/flip-6
Commit: a1e45aecc556c2832c9e01e6624a2a2e34ff8e7a
Parents: bdc3a0a
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Sep 21 12:33:15 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 6 13:38:45 2016 +0200

----------------------------------------------------------------------
 .../HighAvailabilityServicesUtils.java          |  41 +
 .../flink/runtime/rpc/RpcServiceUtils.java      |  73 ++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |   2 +
 .../runtime/taskexecutor/TaskExecutor.java      |  51 +-
 .../taskexecutor/TaskExecutorConfiguration.java | 142 ----
 .../taskexecutor/TaskManagerConfiguration.java  | 205 +++++
 .../runtime/taskexecutor/TaskManagerRunner.java | 172 +++++
 .../taskexecutor/TaskManagerServices.java       | 320 ++++++++
 .../TaskManagerServicesConfiguration.java       | 325 ++++++++
 .../runtime/taskmanager/TaskManagerRunner.java  | 749 -------------------
 .../runtime/util/LeaderRetrievalUtils.java      |   7 +
 .../apache/flink/runtime/akka/AkkaUtils.scala   |   4 +
 .../NetworkEnvironmentConfiguration.scala       |   2 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   6 +-
 .../io/network/NetworkEnvironmentTest.java      |   4 +-
 .../runtime/rpc/TestingSerialRpcService.java    |   1 -
 .../runtime/taskexecutor/TaskExecutorTest.java  |  29 +-
 ...askManagerComponentsStartupShutdownTest.java |   3 +-
 .../TaskManagerConfigurationTest.java           |   1 -
 19 files changed, 1195 insertions(+), 942 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1e45aec/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
new file mode 100644
index 0000000..f3da847
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+
+public class HighAvailabilityServicesUtils {
+
+	public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration) throws Exception {
+		HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
+
+		switch(highAvailabilityMode) {
+			case NONE:
+				final String resourceManagerAddress = null;
+				return new NonHaServices(resourceManagerAddress);
+			case ZOOKEEPER:
+				throw new UnsupportedOperationException("ZooKeeper high availability services " +
+					"have not been implemented yet.");
+			default:
+				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1e45aec/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
new file mode 100644
index 0000000..d40e336
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.actor.ActorSystem;
+import com.typesafe.config.Config;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.NetUtils;
+import org.jboss.netty.channel.ChannelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class RpcServiceUtils {
+	private static final Logger LOG = LoggerFactory.getLogger(RpcServiceUtils.class);
+
+	/**
+	 * Utility method to create RPC service from configuration and hostname, port.
+	 *
+	 * @param hostname   The hostname/address that describes the TaskManager's data location.
+	 * @param port           If true, the TaskManager will not initiate the TCP network stack.
+	 * @param configuration                 The configuration for the TaskManager.
+	 * @return   The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
+	 * @throws IOException      Thrown, if the actor system can not bind to the address
+	 * @throws Exception      Thrown is some other error occurs while creating akka actor system
+	 */
+	public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception {
+		LOG.info("Starting AkkaRpcService at {}.", NetUtils.hostAndPortToUrlString(hostname, port));
+
+		final ActorSystem actorSystem;
+
+		try {
+			Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
+
+			LOG.debug("Using akka configuration \n {}.", akkaConfig);
+
+			actorSystem = AkkaUtils.createActorSystem(akkaConfig);
+		} catch (Throwable t) {
+			if (t instanceof ChannelException) {
+				Throwable cause = t.getCause();
+				if (cause != null && t.getCause() instanceof java.net.BindException) {
+					String address = NetUtils.hostAndPortToUrlString(hostname, port);
+					throw new IOException("Unable to bind AkkaRpcService actor system to address " +
+						address + " - " + cause.getMessage(), t);
+				}
+			}
+			throw new Exception("Could not create TaskManager actor system", t);
+		}
+
+		final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
+		return new AkkaRpcService(actorSystem, timeout);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1e45aec/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 6825557..fb7896a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -93,6 +93,8 @@ public class AkkaRpcService implements RpcService {
 
 		Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
 
+
+
 		if (actorSystemAddress.host().isDefined()) {
 			address = actorSystemAddress.host().get();
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/a1e45aec/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 8ce2780..7df0a91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -18,16 +18,14 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.jboss.netty.channel.ChannelException;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -39,7 +37,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.net.BindException;
+
 import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -60,7 +58,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	private final HighAvailabilityServices haServices;
 
 	/** The task manager configuration */
-	private final TaskExecutorConfiguration taskExecutorConfig;
+	private final TaskManagerConfiguration taskManagerConfiguration;
 
 	/** The I/O manager component in the task manager */
 	private final IOManager ioManager;
@@ -71,9 +69,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	/** The network component in the task manager */
 	private final NetworkEnvironment networkEnvironment;
 
+	/** The metric registry in the task manager */
+	private final MetricRegistry metricRegistry;
+
 	/** The number of slots in the task manager, should be 1 for YARN */
 	private final int numberOfSlots;
 
+	/** The fatal error handler to use in case of a fatal error */
+	private final FatalErrorHandler fatalErrorHandler;
+
 	// --------- resource manager --------
 
 	private TaskExecutorToResourceManagerConnection resourceManagerConnection;
@@ -81,26 +85,30 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	// ------------------------------------------------------------------------
 
 	public TaskExecutor(
-		TaskExecutorConfiguration taskExecutorConfig,
+		TaskManagerConfiguration taskManagerConfiguration,
 		TaskManagerLocation taskManagerLocation,
 		RpcService rpcService,
 		MemoryManager memoryManager,
 		IOManager ioManager,
 		NetworkEnvironment networkEnvironment,
-		HighAvailabilityServices haServices) {
+		HighAvailabilityServices haServices,
+		MetricRegistry metricRegistry,
+		FatalErrorHandler fatalErrorHandler) {
 
 		super(rpcService);
 
-		checkArgument(taskExecutorConfig.getNumberOfSlots() > 0, "The number of slots has to be larger than 0.");
+		checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0.");
 
-		this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
+		this.taskManagerConfiguration = checkNotNull(taskManagerConfiguration);
 		this.taskManagerLocation = checkNotNull(taskManagerLocation);
 		this.memoryManager = checkNotNull(memoryManager);
 		this.ioManager = checkNotNull(ioManager);
 		this.networkEnvironment = checkNotNull(networkEnvironment);
 		this.haServices = checkNotNull(haServices);
+		this.metricRegistry = checkNotNull(metricRegistry);
+		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 
-		this.numberOfSlots =  taskExecutorConfig.getNumberOfSlots();
+		this.numberOfSlots =  taskManagerConfiguration.getNumberSlots();
 	}
 
 	// ------------------------------------------------------------------------
@@ -158,6 +166,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
+	/**
 	 * Requests a slot from the TaskManager
 	 *
 	 * @param allocationID id for the request
@@ -169,22 +178,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		return new SlotRequestRegistered(allocationID);
 	}
 
-	/**
-			public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
-				return null;
-			}
-
-			@Override
-				return null;
-			}
-
-			@Override
-			public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
-				return null;
-			}
-
-			@Override
-			public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
 	// ------------------------------------------------------------------------
 	//  Properties
 	// ------------------------------------------------------------------------
@@ -222,7 +215,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	void onFatalError(Throwable t) {
 		// to be determined, probably delegate to a fatal error handler that 
 		// would either log (mini cluster) ot kill the process (yarn, mesos, ...)
-		log.error("FATAL ERROR", t);
+		fatalErrorHandler.onFatalError(t);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a1e45aec/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
deleted file mode 100644
index c97c893..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskexecutor;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * {@link TaskExecutor} Configuration
- */
-public class TaskExecutorConfiguration implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private final String[] tmpDirPaths;
-
-	private final long cleanupInterval;
-
-	private final int numberOfSlots;
-
-	private final Configuration configuration;
-
-	private final FiniteDuration timeout;
-	private final FiniteDuration maxRegistrationDuration;
-	private final FiniteDuration initialRegistrationPause;
-	private final FiniteDuration maxRegistrationPause;
-	private final FiniteDuration refusedRegistrationPause;
-
-	private final NetworkEnvironmentConfiguration networkConfig;
-
-	public TaskExecutorConfiguration(
-			String[] tmpDirPaths,
-			long cleanupInterval,
-			NetworkEnvironmentConfiguration networkConfig,
-			FiniteDuration timeout,
-			FiniteDuration maxRegistrationDuration,
-			int numberOfSlots,
-			Configuration configuration) {
-
-		this (tmpDirPaths,
-			cleanupInterval,
-			networkConfig,
-			timeout,
-			maxRegistrationDuration,
-			numberOfSlots,
-			configuration,
-			new FiniteDuration(500, TimeUnit.MILLISECONDS),
-			new FiniteDuration(30, TimeUnit.SECONDS),
-			new FiniteDuration(10, TimeUnit.SECONDS));
-	}
-
-	public TaskExecutorConfiguration(
-			String[] tmpDirPaths,
-			long cleanupInterval,
-			NetworkEnvironmentConfiguration networkConfig,
-			FiniteDuration timeout,
-			FiniteDuration maxRegistrationDuration,
-			int numberOfSlots,
-			Configuration configuration,
-			FiniteDuration initialRegistrationPause,
-			FiniteDuration maxRegistrationPause,
-			FiniteDuration refusedRegistrationPause) {
-
-		this.tmpDirPaths = checkNotNull(tmpDirPaths);
-		this.cleanupInterval = checkNotNull(cleanupInterval);
-		this.networkConfig = checkNotNull(networkConfig);
-		this.timeout = checkNotNull(timeout);
-		this.maxRegistrationDuration = maxRegistrationDuration;
-		this.numberOfSlots = checkNotNull(numberOfSlots);
-		this.configuration = checkNotNull(configuration);
-		this.initialRegistrationPause = checkNotNull(initialRegistrationPause);
-		this.maxRegistrationPause = checkNotNull(maxRegistrationPause);
-		this.refusedRegistrationPause = checkNotNull(refusedRegistrationPause);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Properties
-	// --------------------------------------------------------------------------------------------
-
-	public String[] getTmpDirPaths() {
-		return tmpDirPaths;
-	}
-
-	public long getCleanupInterval() {
-		return cleanupInterval;
-	}
-
-	public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; }
-
-	public FiniteDuration getTimeout() {
-		return timeout;
-	}
-
-	public FiniteDuration getMaxRegistrationDuration() {
-		return maxRegistrationDuration;
-	}
-
-	public int getNumberOfSlots() {
-		return numberOfSlots;
-	}
-
-	public Configuration getConfiguration() {
-		return configuration;
-	}
-
-	public FiniteDuration getInitialRegistrationPause() {
-		return initialRegistrationPause;
-	}
-
-	public FiniteDuration getMaxRegistrationPause() {
-		return maxRegistrationPause;
-	}
-
-	public FiniteDuration getRefusedRegistrationPause() {
-		return refusedRegistrationPause;
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/a1e45aec/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
new file mode 100644
index 0000000..32eb8c1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.io.File;
+
+/**
+ * Configuration object for {@link TaskExecutor}.
+ */
+public class TaskManagerConfiguration {
+
+	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerConfiguration.class);
+
+	private final int numberSlots;
+
+	private final String[] tmpDirPaths;
+
+	private final Time timeout;
+	private final Time maxRegistrationDuration;
+	private final Time initialRegistrationPause;
+	private final Time maxRegistrationPause;
+	private final Time refusedRegistrationPause;
+
+	private final long cleanupInterval;
+
+	public TaskManagerConfiguration(
+		int numberSlots,
+		String[] tmpDirPaths,
+		Time timeout,
+		Time maxRegistrationDuration,
+		Time initialRegistrationPause,
+		Time maxRegistrationPause,
+		Time refusedRegistrationPause,
+		long cleanupInterval) {
+
+		this.numberSlots = numberSlots;
+		this.tmpDirPaths = Preconditions.checkNotNull(tmpDirPaths);
+		this.timeout = Preconditions.checkNotNull(timeout);
+		this.maxRegistrationDuration = Preconditions.checkNotNull(maxRegistrationDuration);
+		this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause);
+		this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause);
+		this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);
+		this.cleanupInterval = Preconditions.checkNotNull(cleanupInterval);
+	}
+
+	public int getNumberSlots() {
+		return numberSlots;
+	}
+
+	public String[] getTmpDirPaths() {
+		return tmpDirPaths;
+	}
+
+	public Time getTimeout() {
+		return timeout;
+	}
+
+	public Time getMaxRegistrationDuration() {
+		return maxRegistrationDuration;
+	}
+
+	public Time getInitialRegistrationPause() {
+		return initialRegistrationPause;
+	}
+
+	public Time getMaxRegistrationPause() {
+		return maxRegistrationPause;
+	}
+
+	public Time getRefusedRegistrationPause() {
+		return refusedRegistrationPause;
+	}
+
+	public long getCleanupInterval() {
+		return cleanupInterval;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Static factory methods
+	// --------------------------------------------------------------------------------------------
+
+	public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
+		int numberSlots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+
+		if (numberSlots == -1) {
+			numberSlots = 1;
+		}
+
+		final String[] tmpDirPaths = configuration.getString(
+			ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+			ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
+
+		final Time timeout;
+
+		try {
+			timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
+		} catch (Exception e) {
+			throw new IllegalArgumentException(
+				"Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT +
+					"'.Use formats like '50 s' or '1 min' to specify the timeout.");
+		}
+
+		LOG.info("Messages have a max timeout of " + timeout);
+
+		final long cleanupInterval = configuration.getLong(
+			ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+			ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+
+		final Time finiteRegistrationDuration;
+
+		try {
+			Duration maxRegistrationDuration = Duration.create(configuration.getString(
+				ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
+				ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
+			if (maxRegistrationDuration.isFinite()) {
+				finiteRegistrationDuration = Time.seconds(maxRegistrationDuration.toSeconds());
+			} else {
+				finiteRegistrationDuration = null;
+			}
+		} catch (NumberFormatException e) {
+			throw new IllegalArgumentException("Invalid format for parameter " +
+				ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e);
+		}
+
+		final Time initialRegistrationPause;
+		try {
+			Duration pause = Duration.create(configuration.getString(
+				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+				ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
+			if (pause.isFinite()) {
+				initialRegistrationPause = Time.seconds(pause.toSeconds());
+			} else {
+				throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);
+			}
+		} catch (NumberFormatException e) {
+			throw new IllegalArgumentException("Invalid format for parameter " +
+				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+		}
+
+		final Time maxRegistrationPause;
+		try {
+			Duration pause = Duration.create(configuration.getString(
+				ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
+				ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
+			if (pause.isFinite()) {
+				maxRegistrationPause = Time.seconds(pause.toSeconds());
+			} else {
+				throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);
+			}
+		} catch (NumberFormatException e) {
+			throw new IllegalArgumentException("Invalid format for parameter " +
+				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+		}
+
+		final Time refusedRegistrationPause;
+		try {
+			Duration pause = Duration.create(configuration.getString(
+				ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
+				ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
+			if (pause.isFinite()) {
+				refusedRegistrationPause = Time.seconds(pause.toSeconds());
+			} else {
+				throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);
+			}
+		} catch (NumberFormatException e) {
+			throw new IllegalArgumentException("Invalid format for parameter " +
+				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+		}
+
+		return new TaskManagerConfiguration(
+			numberSlots,
+			tmpDirPaths,
+			timeout,
+			finiteRegistrationDuration,
+			initialRegistrationPause,
+			maxRegistrationPause,
+			refusedRegistrationPause,
+			cleanupInterval);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1e45aec/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
new file mode 100644
index 0000000..8ac0ddd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is the executable entry point for the task manager in yarn or standalone mode.
+ * It constructs the related components (network, I/O manager, memory manager, RPC service, HA service)
+ * and starts them.
+ */
+public class TaskManagerRunner implements FatalErrorHandler {
+
+	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class);
+
+	private final Object lock = new Object();
+
+	private final Configuration configuration;
+
+	private final ResourceID resourceID;
+
+	private final RpcService rpcService;
+
+	private final HighAvailabilityServices highAvailabilityServices;
+
+	/** Executor used to run future callbacks */
+	private final Executor executor;
+
+	private final TaskExecutor taskManager;
+
+	public TaskManagerRunner(
+		Configuration configuration,
+		ResourceID resourceID,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		Executor executor) throws Exception {
+
+		this.configuration = Preconditions.checkNotNull(configuration);
+		this.resourceID = Preconditions.checkNotNull(resourceID);
+		this.rpcService = Preconditions.checkNotNull(rpcService);
+		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
+		this.executor = rpcService.getExecutor();
+
+		InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());
+
+		TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration(
+			configuration,
+			remoteAddress,
+			false);
+
+		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, resourceID);
+
+		TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+
+		this.taskManager = new TaskExecutor(
+			taskManagerConfiguration,
+			taskManagerServices.getTaskManagerLocation(),
+			rpcService,
+			taskManagerServices.getMemoryManager(),
+			taskManagerServices.getIOManager(),
+			taskManagerServices.getNetworkEnvironment(),
+			highAvailabilityServices,
+			taskManagerServices.getMetricRegistry(),
+			this);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Lifecycle management
+	// --------------------------------------------------------------------------------------------
+
+	public void start() {
+		taskManager.start();
+	}
+
+	public void shutDown(Throwable cause) {
+		shutDownInternally();
+	}
+
+	protected void shutDownInternally() {
+		synchronized(lock) {
+			taskManager.shutDown();
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  FatalErrorHandler methods
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void onFatalError(Throwable exception) {
+		LOG.error("Fatal error occurred while executing the TaskManager. Shutting it down...", exception);
+		shutDown(exception);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Static utilities
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Create a RPC service for the task manager.
+	 *
+	 * @param configuration The configuration for the TaskManager.
+	 * @param haServices to use for the task manager hostname retrieval
+	 */
+	public static RpcService createRpcService(
+		final Configuration configuration,
+		final HighAvailabilityServices haServices) throws Exception {
+
+		checkNotNull(configuration);
+		checkNotNull(haServices);
+
+		String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+
+		if (taskManagerHostname != null) {
+			LOG.info("Using configured hostname/address for TaskManager: {}.", taskManagerHostname);
+		} else {
+			Time lookupTimeout = Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis());
+
+			InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
+				haServices.getResourceManagerLeaderRetriever(),
+				lookupTimeout);
+
+			taskManagerHostname = taskManagerAddress.getHostName();
+
+			LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
+				taskManagerHostname, taskManagerAddress.getHostAddress());
+		}
+
+		final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+
+		Preconditions.checkState(rpcPort < 0 || rpcPort >65535, "Invalid value for " +
+				"'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
+				"use 0 to let the system choose port automatically.",
+			ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
+
+		return RpcServiceUtils.createRpcService(taskManagerHostname, rpcPort, configuration);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1e45aec/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
new file mode 100644
index 0000000..ff7f7d5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Container for {@link TaskExecutor} services such as the {@link MemoryManager}, {@link IOManager},
+ * {@link NetworkEnvironment} and the {@link MetricRegistry}.
+ */
+public class TaskManagerServices {
+	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class);
+
+	/** TaskManager services */
+	private final TaskManagerLocation taskManagerLocation;
+	private final MemoryManager memoryManager;
+	private final IOManager ioManager;
+	private final NetworkEnvironment networkEnvironment;
+	private final MetricRegistry metricRegistry;
+
+	private TaskManagerServices(
+		TaskManagerLocation taskManagerLocation,
+		MemoryManager memoryManager,
+		IOManager ioManager,
+		NetworkEnvironment networkEnvironment,
+		MetricRegistry metricRegistry) {
+
+		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
+		this.memoryManager = Preconditions.checkNotNull(memoryManager);
+		this.ioManager = Preconditions.checkNotNull(ioManager);
+		this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
+		this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Getter/Setter
+	// --------------------------------------------------------------------------------------------
+
+	public MemoryManager getMemoryManager() {
+		return memoryManager;
+	}
+
+	public IOManager getIOManager() {
+		return ioManager;
+	}
+
+	public NetworkEnvironment getNetworkEnvironment() {
+		return networkEnvironment;
+	}
+
+	public TaskManagerLocation getTaskManagerLocation() {
+		return taskManagerLocation;
+	}
+
+	public MetricRegistry getMetricRegistry() {
+		return metricRegistry;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Static factory methods for task manager services
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates and returns the task manager services.
+	 *
+	 * @param resourceID resource ID of the task manager
+	 * @param taskManagerServicesConfiguration task manager configuration
+	 * @return task manager components
+	 * @throws Exception
+	 */
+	public static TaskManagerServices fromConfiguration(
+		TaskManagerServicesConfiguration taskManagerServicesConfiguration,
+		ResourceID resourceID) throws Exception {
+
+		final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration);
+
+		network.start();
+
+		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
+			resourceID,
+			taskManagerServicesConfiguration.getTaskManagerAddress(),
+			network.getConnectionManager().getDataPort());
+
+		// this call has to happen strictly after the network stack has been initialized
+		final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration);
+
+		// start the I/O manager, it will create some temp directories.
+		final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
+
+		MetricRegistry metricsRegistry = new MetricRegistry(taskManagerServicesConfiguration.getMetricRegistryConfiguration());
+
+		return new TaskManagerServices(taskManagerLocation, memoryManager, ioManager, network, metricsRegistry);
+	}
+
+	/**
+	 * Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}.
+	 *
+	 * @param taskManagerServicesConfiguration to create the memory manager from
+	 * @return Memory manager
+	 * @throws Exception
+	 */
+	private static MemoryManager createMemoryManager(TaskManagerServicesConfiguration taskManagerServicesConfiguration) throws Exception {
+		// computing the amount of memory to use depends on how much memory is available
+		// it strictly needs to happen AFTER the network stack has been initialized
+
+		MemoryType memType = taskManagerServicesConfiguration.getNetworkConfig().memoryType();
+
+		// check if a value has been configured
+		long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();
+
+		final long memorySize;
+
+		boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();
+
+		if (configuredMemory > 0) {
+			if (preAllocateMemory) {
+				LOG.info("Using {} MB for managed memory." , configuredMemory);
+			} else {
+				LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
+			}
+			memorySize = configuredMemory << 20; // megabytes to bytes
+		} else {
+			float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();
+
+			if (memType == MemoryType.HEAP) {
+				long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * memoryFraction);
+				if (preAllocateMemory) {
+					LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
+						memoryFraction , relativeMemSize >> 20);
+				} else {
+					LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +
+						"memory will be allocated lazily." , memoryFraction , relativeMemSize >> 20);
+				}
+				memorySize = relativeMemSize;
+			} else if (memType == MemoryType.OFF_HEAP) {
+				// The maximum heap memory has been adjusted according to the fraction
+				long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
+				long directMemorySize = (long) (maxMemory / (1.0 - memoryFraction) * memoryFraction);
+				if (preAllocateMemory) {
+					LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
+						memoryFraction, directMemorySize >> 20);
+				} else {
+					LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
+						" memory will be allocated lazily.", memoryFraction, directMemorySize >> 20);
+				}
+				memorySize = directMemorySize;
+			} else {
+				throw new RuntimeException("No supported memory type detected.");
+			}
+		}
+
+		// now start the memory manager
+		final MemoryManager memoryManager;
+		try {
+			memoryManager = new MemoryManager(
+				memorySize,
+				taskManagerServicesConfiguration.getNumberOfSlots(),
+				taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(),
+				memType,
+				preAllocateMemory);
+		} catch (OutOfMemoryError e) {
+			if (memType == MemoryType.HEAP) {
+				throw new Exception("OutOfMemory error (" + e.getMessage() +
+					") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
+			} else if (memType == MemoryType.OFF_HEAP) {
+				throw new Exception("OutOfMemory error (" + e.getMessage() +
+					") while allocating the TaskManager off-heap memory (" + memorySize +
+					" bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
+			} else {
+				throw e;
+			}
+		}
+		return memoryManager;
+	}
+
+	/**
+	 * Creates the {@link NetworkEnvironment} from the given {@link TaskManagerServicesConfiguration}.
+	 *
+	 * @param taskManagerServicesConfiguration to construct the network environment from
+	 * @return Network environment
+	 * @throws IOException
+	 */
+	private static NetworkEnvironment createNetworkEnvironment(TaskManagerServicesConfiguration taskManagerServicesConfiguration) throws IOException {
+		// pre-start checks
+		checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
+
+		NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig();
+
+		NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+			networkEnvironmentConfiguration.numNetworkBuffers(),
+			networkEnvironmentConfiguration.networkBufferSize(),
+			networkEnvironmentConfiguration.memoryType());
+
+		ConnectionManager connectionManager;
+
+		if (networkEnvironmentConfiguration.nettyConfig() != null) {
+			connectionManager = new NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig());
+		} else {
+			connectionManager = new LocalConnectionManager();
+		}
+
+		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
+		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
+
+		KvStateRegistry kvStateRegistry = new KvStateRegistry();
+
+		KvStateServer kvStateServer;
+
+		if (networkEnvironmentConfiguration.nettyConfig() != null) {
+			NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig();
+
+			int numNetworkThreads = networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ?
+				nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerNetworkThreads();
+
+			int numQueryThreads = networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ?
+				nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerQueryThreads();
+
+			kvStateServer = new KvStateServer(
+				taskManagerServicesConfiguration.getTaskManagerAddress(),
+				networkEnvironmentConfiguration.queryServerPort(),
+				numNetworkThreads,
+				numQueryThreads,
+				kvStateRegistry,
+				new DisabledKvStateRequestStats());
+		} else {
+			kvStateServer = null;
+		}
+
+		// we start the network first, to make sure it can allocate its buffers first
+		final NetworkEnvironment network = new NetworkEnvironment(
+			networkBufferPool,
+			connectionManager,
+			resultPartitionManager,
+			taskEventDispatcher,
+			kvStateRegistry,
+			kvStateServer,
+			networkEnvironmentConfiguration.ioMode(),
+			networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
+			networkEnvironmentConfiguration.partitinRequestMaxBackoff());
+
+		return network;
+	}
+
+	/**
+	 * Validates that all the directories denoted by the strings do actually exist, are proper
+	 * directories (not files), and are writable.
+	 *
+	 * @param tmpDirs       The array of directory paths to check.
+	 * @throws Exception    Thrown if any of the directories does not exist or is not writable
+	 *                   or is a file, rather than a directory.
+	 */
+	private static void checkTempDirs(String[] tmpDirs) throws IOException {
+		for (String dir : tmpDirs) {
+			if (dir != null && !dir.equals("")) {
+				File file = new File(dir);
+				if (!file.exists()) {
+					throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist.");
+				}
+				if (!file.isDirectory()) {
+					throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory.");
+				}
+				if (!file.canWrite()) {
+					throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable.");
+				}
+
+				if (LOG.isInfoEnabled()) {
+					long totalSpaceGb = file.getTotalSpace() >> 30;
+					long usableSpaceGb = file.getUsableSpace() >> 30;
+					double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100;
+					String path = file.getAbsolutePath();
+					LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)",
+						path, totalSpaceGb, usableSpaceGb, usablePercentage));
+				}
+			} else {
+				throw new IllegalArgumentException("Temporary file directory #$id is null.");
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1e45aec/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
new file mode 100644
index 0000000..66d969a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.HybridMemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.util.MathUtils;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Configuration for the task manager services such as the network environment, the memory manager,
+ * the io manager and the metric registry
+ */
+public class TaskManagerServicesConfiguration {
+
+	private final InetAddress taskManagerAddress;
+
+	private final String[] tmpDirPaths;
+
+	private final int numberOfSlots;
+
+	private final NetworkEnvironmentConfiguration networkConfig;
+
+	private final long configuredMemory;
+
+	private final boolean preAllocateMemory;
+
+	private final float memoryFraction;
+
+	private final MetricRegistryConfiguration metricRegistryConfiguration;
+
+	public TaskManagerServicesConfiguration(
+		InetAddress taskManagerAddress,
+		String[] tmpDirPaths,
+		NetworkEnvironmentConfiguration networkConfig,
+		int numberOfSlots,
+		long configuredMemory,
+		boolean preAllocateMemory,
+		float memoryFraction,
+		MetricRegistryConfiguration metricRegistryConfiguration) {
+
+		this.taskManagerAddress = checkNotNull(taskManagerAddress);
+		this.tmpDirPaths = checkNotNull(tmpDirPaths);
+		this.networkConfig = checkNotNull(networkConfig);
+		this.numberOfSlots = checkNotNull(numberOfSlots);
+
+		this.configuredMemory = configuredMemory;
+		this.preAllocateMemory = preAllocateMemory;
+		this.memoryFraction = memoryFraction;
+
+		this.metricRegistryConfiguration = checkNotNull(metricRegistryConfiguration);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Getter/Setter
+	// --------------------------------------------------------------------------------------------
+
+
+	public InetAddress getTaskManagerAddress() {
+		return taskManagerAddress;
+	}
+
+	public String[] getTmpDirPaths() {
+		return tmpDirPaths;
+	}
+
+	public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; }
+
+	public int getNumberOfSlots() {
+		return numberOfSlots;
+	}
+
+	public float getMemoryFraction() {
+		return memoryFraction;
+	}
+
+	public long getConfiguredMemory() {
+		return configuredMemory;
+	}
+
+	public boolean isPreAllocateMemory() {
+		return preAllocateMemory;
+	}
+
+	public MetricRegistryConfiguration getMetricRegistryConfiguration() {
+		return metricRegistryConfiguration;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Parsing of Flink configuration
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Utility method to extract TaskManager config parameters from the configuration and to
+	 * sanity check them.
+	 *
+	 * @param configuration The configuration.
+	 * @param remoteAddress identifying the IP address under which the TaskManager will be accessible
+	 * @param localCommunication True, to skip initializing the network stack.
+	 *                                      Use only in cases where only one task manager runs.
+	 * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
+	 */
+	public static TaskManagerServicesConfiguration fromConfiguration(
+		Configuration configuration,
+		InetAddress remoteAddress,
+		boolean localCommunication) throws Exception {
+
+		// we need this because many configs have been written with a "-1" entry
+		int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+		if (slots == -1) {
+			slots = 1;
+		}
+
+		final String[] tmpDirs = configuration.getString(
+			ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+			ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
+
+		final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(
+			configuration,
+			localCommunication,
+			remoteAddress,
+			slots);
+
+		// extract memory settings
+		long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+		checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
+			ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+			"MemoryManager needs at least one MB of memory. " +
+				"If you leave this config parameter empty, the system automatically " +
+				"pick a fraction of the available memory.");
+
+		boolean preAllocateMemory = configuration.getBoolean(
+			ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
+			ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
+
+		float memoryFraction = configuration.getFloat(
+			ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+			ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
+		checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction,
+			ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+			"MemoryManager fraction of the free memory must be between 0.0 and 1.0");
+
+		final MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
+
+		return new TaskManagerServicesConfiguration(
+			remoteAddress,
+			tmpDirs,
+			networkConfig,
+			slots,
+			configuredMemory,
+			preAllocateMemory,
+			memoryFraction,
+			metricRegistryConfiguration);
+	}
+
+	// --------------------------------------------------------------------------
+	//  Parsing and checking the TaskManager Configuration
+	// --------------------------------------------------------------------------
+
+	/**
+	 * Creates the {@link NetworkEnvironmentConfiguration} from the given {@link Configuration}.
+	 *
+	 * @param configuration to create the network environment configuration from
+	 * @param localTaskManagerCommunication true if task manager communication is local
+	 * @param taskManagerAddress address of the task manager
+	 * @param slots to start the task manager with
+	 * @return Network environment configuration
+	 */
+	private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration(
+		Configuration configuration,
+		boolean localTaskManagerCommunication,
+		InetAddress taskManagerAddress,
+		int slots) throws Exception {
+
+		// ----> hosts / ports for communication and data exchange
+
+		int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+			ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
+
+		checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+			"Leave config parameter empty or use 0 to let the system choose a port automatically.");
+
+		checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+			"Number of task slots must be at least one.");
+
+		final int numNetworkBuffers = configuration.getInteger(
+			ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+
+		checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
+			ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, "");
+
+		final int pageSize = configuration.getInteger(
+			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+			ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
+
+		// check page size of for minimum size
+		checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
+			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+			"Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
+
+		// check page size for power of two
+		checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
+			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+			"Memory segment size must be a power of 2.");
+
+		// check whether we use heap or off-heap memory
+		final MemoryType memType;
+		if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) {
+			memType = MemoryType.OFF_HEAP;
+		} else {
+			memType = MemoryType.HEAP;
+		}
+
+		// initialize the memory segment factory accordingly
+		if (memType == MemoryType.HEAP) {
+			if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
+				throw new Exception("Memory type is set to heap memory, but memory segment " +
+					"factory has been initialized for off-heap memory segments");
+			}
+		} else {
+			if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) {
+				throw new Exception("Memory type is set to off-heap memory, but memory segment " +
+					"factory has been initialized for heap memory segments");
+			}
+		}
+
+		final NettyConfig nettyConfig;
+		if (!localTaskManagerCommunication) {
+			final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
+
+			nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(),
+				taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration);
+		} else {
+			nettyConfig = null;
+		}
+
+		// Default spill I/O mode for intermediate results
+		final String syncOrAsync = configuration.getString(
+			ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
+			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
+
+		final IOManager.IOMode ioMode;
+		if (syncOrAsync.equals("async")) {
+			ioMode = IOManager.IOMode.ASYNC;
+		} else {
+			ioMode = IOManager.IOMode.SYNC;
+		}
+
+		final int queryServerPort =  configuration.getInteger(
+			ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
+			ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT);
+
+		final int queryServerNetworkThreads =  configuration.getInteger(
+			ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
+			ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS);
+
+		final int queryServerQueryThreads =  configuration.getInteger(
+			ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
+			ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS);
+
+		return new NetworkEnvironmentConfiguration(
+			numNetworkBuffers,
+			pageSize,
+			memType,
+			ioMode,
+			queryServerPort,
+			queryServerNetworkThreads,
+			queryServerQueryThreads,
+			nettyConfig,
+			500,
+			3000);
+	}
+
+	/**
+	 * Validates a condition for a config parameter and displays a standard exception, if the
+	 * the condition does not hold.
+	 *
+	 * @param condition             The condition that must hold. If the condition is false, an exception is thrown.
+	 * @param parameter         The parameter value. Will be shown in the exception message.
+	 * @param name              The name of the config parameter. Will be shown in the exception message.
+	 * @param errorMessage  The optional custom error message to append to the exception message.
+	 */
+	private static void checkConfigParameter(
+		boolean condition,
+		Object parameter,
+		String name,
+		String errorMessage) {
+		if (!condition) {
+			throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage);
+		}
+	}
+}
+