You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:46:02 UTC

[20/50] [abbrv] flink git commit: [FLINK-4505] [cluster mngt] Separate TaskManager service configuration from TaskManagerConfiguration; Implement TaskManagerRunner

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java
deleted file mode 100644
index 4f756fb..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java
+++ /dev/null
@@ -1,749 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.memory.HeapMemorySegment;
-import org.apache.flink.core.memory.HybridMemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.ConnectionManager;
-import org.apache.flink.runtime.io.network.LocalConnectionManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.KvStateServer;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-import org.apache.flink.runtime.taskexecutor.TaskExecutor;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorConfiguration;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.util.MathUtils;
-import org.apache.flink.util.NetUtils;
-
-import akka.actor.ActorSystem;
-import akka.util.Timeout;
-import com.typesafe.config.Config;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * This class is the executable entry point for the task manager in yarn or standalone mode.
- * It constructs the related components (network, I/O manager, memory manager, RPC service, HA service)
- * and starts them.
- */
-public class TaskManagerRunner {
-
-	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class);
-
-	/**
-	 * Constructs related components of the TaskManager and starts them.
-	 *
-	 * @param configuration                 The configuration for the TaskManager.
-	 * @param resourceID                    The id of the resource which the task manager will run on.
-	 * @param rpcService                    Optionally, The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
-	 *                                                 If none is given, then a RpcService is constructed from the configuration.
-	 * @param taskManagerHostname   Optionally, The hostname/address that describes the TaskManager's data location.
-	 *                                                 If none is given, it can be got from the configuration.
-	 * @param localTaskManagerCommunication      If true, the TaskManager will not initiate the TCP network stack.
-	 * @param haServices                    Optionally, a high availability service can be provided. If none is given,
-	 *                                                 then a HighAvailabilityServices is constructed from the configuration.
-	 */
-	public static void createAndStartComponents(
-		final Configuration configuration,
-		final ResourceID resourceID,
-		RpcService rpcService,
-		String taskManagerHostname,
-		boolean localTaskManagerCommunication,
-		HighAvailabilityServices haServices) throws Exception {
-
-		checkNotNull(configuration);
-		checkNotNull(resourceID);
-
-		if (taskManagerHostname == null || taskManagerHostname.isEmpty()) {
-			taskManagerHostname = selectNetworkInterface(configuration);
-		}
-
-		if (rpcService == null) {
-			// if no task manager port has been configured, use 0 (system will pick any free port)
-			final int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
-			if (actorSystemPort < 0 || actorSystemPort > 65535) {
-				throw new IllegalConfigurationException("Invalid value for '" +
-					ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
-					"' (port for the TaskManager actor system) : " + actorSystemPort +
-					" - Leave config parameter empty or use 0 to let the system choose a port automatically.");
-			}
-			rpcService = createRpcService(configuration, taskManagerHostname, actorSystemPort);
-		}
-
-		if(haServices == null) {
-			// start high availability service to implement getResourceManagerLeaderRetriever method only
-			haServices = new HighAvailabilityServices() {
-				@Override
-				public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
-					return LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
-				}
-
-				@Override
-				public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
-					return null;
-				}
-
-				@Override
-				public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
-					return null;
-				}
-			};
-		}
-
-		createAndStartTaskManagerComponents(
-			configuration,
-			resourceID,
-			rpcService,
-			taskManagerHostname,
-			haServices,
-			localTaskManagerCommunication);
-	}
-
-	/**
-	 * <p/>
-	 * This method tries to select the network interface to use for the TaskManager
-	 * communication. The network interface is used both for the actor communication
-	 * (coordination) as well as for the data exchange between task managers. Unless
-	 * the hostname/interface is explicitly configured in the configuration, this
-	 * method will try out various interfaces and methods to connect to the JobManager
-	 * and select the one where the connection attempt is successful.
-	 * <p/>
-	 *
-	 * @param configuration    The configuration for the TaskManager.
-	 * @return  The host name under which the TaskManager communicates.
-	 */
-	private static String selectNetworkInterface(Configuration configuration) throws Exception {
-		String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
-		if (taskManagerHostname != null) {
-			LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname);
-		} else {
-			LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
-			FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
-
-			InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, lookupTimeout);
-			taskManagerHostname = taskManagerAddress.getHostName();
-			LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
-				taskManagerHostname, taskManagerAddress.getHostAddress());
-		}
-
-		return taskManagerHostname;
-	}
-
-	/**
-	 * Utility method to create RPC service from configuration and hostname, port.
-	 *
-	 * @param configuration                 The configuration for the TaskManager.
-	 * @param taskManagerHostname   The hostname/address that describes the TaskManager's data location.
-	 * @param actorSystemPort           If true, the TaskManager will not initiate the TCP network stack.
-	 * @return   The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
-	 * @throws java.io.IOException      Thrown, if the actor system can not bind to the address
-	 * @throws java.lang.Exception      Thrown is some other error occurs while creating akka actor system
-	 */
-	private static RpcService createRpcService(Configuration configuration, String taskManagerHostname, int actorSystemPort)
-		throws Exception{
-
-		// Bring up the TaskManager actor system first, bind it to the given address.
-
-		LOG.info("Starting TaskManager actor system at " +
-			NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort));
-
-		final ActorSystem taskManagerSystem;
-		try {
-			Tuple2<String, Object> address = new Tuple2<String, Object>(taskManagerHostname, actorSystemPort);
-			Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new Some<>(address));
-			LOG.debug("Using akka configuration\n " + akkaConfig);
-			taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig);
-		} catch (Throwable t) {
-			if (t instanceof org.jboss.netty.channel.ChannelException) {
-				Throwable cause = t.getCause();
-				if (cause != null && t.getCause() instanceof java.net.BindException) {
-					String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
-					throw new IOException("Unable to bind TaskManager actor system to address " +
-						address + " - " + cause.getMessage(), t);
-				}
-			}
-			throw new Exception("Could not create TaskManager actor system", t);
-		}
-
-		// start akka rpc service based on actor system
-		final Timeout timeout = new Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS);
-		final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout);
-
-		return akkaRpcService;
-	}
-
-	/**
-	 * @param configuration                 The configuration for the TaskManager.
-	 * @param resourceID                    The id of the resource which the task manager will run on.
-	 * @param rpcService                    The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
-	 * @param taskManagerHostname   The hostname/address that describes the TaskManager's data location.
-	 * @param haServices                    Optionally, a high availability service can be provided. If none is given,
-	 *                                                  then a HighAvailabilityServices is constructed from the configuration.
-	 * @param localTaskManagerCommunication     If true, the TaskManager will not initiate the TCP network stack.
-	 * @throws IllegalConfigurationException        Thrown, if the given config contains illegal values.
-	 * @throws IOException      Thrown, if any of the I/O components (such as buffer pools, I/O manager, ...)
-	 *                                              cannot be properly started.
-	 * @throws Exception      Thrown is some other error occurs while parsing the configuration or
-	 *                                              starting the TaskManager components.
-	 */
-	private static void createAndStartTaskManagerComponents(
-		Configuration configuration,
-		ResourceID resourceID,
-		RpcService rpcService,
-		String taskManagerHostname,
-		HighAvailabilityServices haServices,
-		boolean localTaskManagerCommunication) throws Exception {
-
-		final TaskExecutorConfiguration taskManagerConfig = parseTaskManagerConfiguration(
-			configuration, taskManagerHostname, localTaskManagerCommunication);
-
-		TaskManagerComponents taskManagerComponents = createTaskManagerComponents(
-			resourceID,
-			InetAddress.getByName(taskManagerHostname),
-			taskManagerConfig,
-			configuration);
-
-		final TaskExecutor taskExecutor = new TaskExecutor(
-			taskManagerConfig,
-			taskManagerComponents.getTaskManagerLocation(),
-			rpcService, taskManagerComponents.getMemoryManager(),
-			taskManagerComponents.getIOManager(),
-			taskManagerComponents.getNetworkEnvironment(),
-			haServices);
-
-		taskExecutor.start();
-	}
-
-	/**
-	 * Creates and returns the task manager components.
-	 *
-	 * @param resourceID resource ID of the task manager
-	 * @param taskManagerAddress address of the task manager
-	 * @param taskExecutorConfig task manager configuration
-	 * @param configuration of Flink
-	 * @return task manager components
-	 * @throws Exception
-	 */
-	private static TaskManagerComponents createTaskManagerComponents(
-		ResourceID resourceID,
-		InetAddress taskManagerAddress,
-		TaskExecutorConfiguration taskExecutorConfig,
-		Configuration configuration) throws Exception {
-
-		MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType();
-
-		// pre-start checks
-		checkTempDirs(taskExecutorConfig.getTmpDirPaths());
-
-		NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskExecutorConfig.getNetworkConfig();
-
-		NetworkBufferPool networkBufferPool = new NetworkBufferPool(
-			networkEnvironmentConfiguration.numNetworkBuffers(),
-			networkEnvironmentConfiguration.networkBufferSize(),
-			networkEnvironmentConfiguration.memoryType());
-
-		ConnectionManager connectionManager;
-
-		if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
-			connectionManager = new NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig().get());
-		} else {
-			connectionManager = new LocalConnectionManager();
-		}
-
-		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
-		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
-
-		KvStateRegistry kvStateRegistry = new KvStateRegistry();
-
-		KvStateServer kvStateServer;
-
-		if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
-			NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig().get();
-
-			int numNetworkThreads = networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ?
-				nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerNetworkThreads();
-
-			int numQueryThreads = networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ?
-				nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerQueryThreads();
-
-			kvStateServer = new KvStateServer(
-				taskManagerAddress,
-				networkEnvironmentConfiguration.queryServerPort(),
-				numNetworkThreads,
-				numQueryThreads,
-				kvStateRegistry,
-				new DisabledKvStateRequestStats());
-		} else {
-			kvStateServer = null;
-		}
-
-		// we start the network first, to make sure it can allocate its buffers first
-		final NetworkEnvironment network = new NetworkEnvironment(
-			networkBufferPool,
-			connectionManager,
-			resultPartitionManager,
-			taskEventDispatcher,
-			kvStateRegistry,
-			kvStateServer,
-			networkEnvironmentConfiguration.ioMode(),
-			networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
-			networkEnvironmentConfiguration.partitinRequestMaxBackoff());
-
-		network.start();
-
-		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
-			resourceID,
-			taskManagerAddress,
-			network.getConnectionManager().getDataPort());
-
-		// computing the amount of memory to use depends on how much memory is available
-		// it strictly needs to happen AFTER the network stack has been initialized
-
-		// check if a value has been configured
-		long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
-		checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
-			ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
-			"MemoryManager needs at least one MB of memory. " +
-				"If you leave this config parameter empty, the system automatically " +
-				"pick a fraction of the available memory.");
-
-		final long memorySize;
-		boolean preAllocateMemory = configuration.getBoolean(
-			ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
-		if (configuredMemory > 0) {
-			if (preAllocateMemory) {
-				LOG.info("Using {} MB for managed memory." , configuredMemory);
-			} else {
-				LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
-			}
-			memorySize = configuredMemory << 20; // megabytes to bytes
-		} else {
-			float fraction = configuration.getFloat(
-				ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-				ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
-			checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction,
-				ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-				"MemoryManager fraction of the free memory must be between 0.0 and 1.0");
-
-			if (memType == MemoryType.HEAP) {
-				long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
-				if (preAllocateMemory) {
-					LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
-						fraction , relativeMemSize >> 20);
-				} else {
-					LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +
-						"memory will be allocated lazily." , fraction , relativeMemSize >> 20);
-				}
-				memorySize = relativeMemSize;
-			} else if (memType == MemoryType.OFF_HEAP) {
-				// The maximum heap memory has been adjusted according to the fraction
-				long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
-				long directMemorySize = (long) (maxMemory / (1.0 - fraction) * fraction);
-				if (preAllocateMemory) {
-					LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
-						fraction, directMemorySize >> 20);
-				} else {
-					LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
-						" memory will be allocated lazily.", fraction, directMemorySize >> 20);
-				}
-				memorySize = directMemorySize;
-			} else {
-				throw new RuntimeException("No supported memory type detected.");
-			}
-		}
-
-		// now start the memory manager
-		final MemoryManager memoryManager;
-		try {
-			memoryManager = new MemoryManager(
-				memorySize,
-				taskExecutorConfig.getNumberOfSlots(),
-				taskExecutorConfig.getNetworkConfig().networkBufferSize(),
-				memType,
-				preAllocateMemory);
-		} catch (OutOfMemoryError e) {
-			if (memType == MemoryType.HEAP) {
-				throw new Exception("OutOfMemory error (" + e.getMessage() +
-					") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
-			} else if (memType == MemoryType.OFF_HEAP) {
-				throw new Exception("OutOfMemory error (" + e.getMessage() +
-					") while allocating the TaskManager off-heap memory (" + memorySize +
-					" bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
-			} else {
-				throw e;
-			}
-		}
-
-		// start the I/O manager, it will create some temp directories.
-		final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths());
-
-		return new TaskManagerComponents(taskManagerLocation, memoryManager, ioManager, network);
-	}
-
-	// --------------------------------------------------------------------------
-	//  Parsing and checking the TaskManager Configuration
-	// --------------------------------------------------------------------------
-
-	/**
-	 * Utility method to extract TaskManager config parameters from the configuration and to
-	 * sanity check them.
-	 *
-	 * @param configuration                         The configuration.
-	 * @param taskManagerHostname           The host name under which the TaskManager communicates.
-	 * @param localTaskManagerCommunication             True, to skip initializing the network stack.
-	 *                                      Use only in cases where only one task manager runs.
-	 * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
-	 */
-	private static TaskExecutorConfiguration parseTaskManagerConfiguration(
-		Configuration configuration,
-		String taskManagerHostname,
-		boolean localTaskManagerCommunication) throws Exception {
-
-		// ------- read values from the config and check them ---------
-		//                      (a lot of them)
-
-		// ----> hosts / ports for communication and data exchange
-
-		int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
-
-		checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-			"Leave config parameter empty or use 0 to let the system choose a port automatically.");
-
-		InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname);
-		final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
-
-		// ----> memory / network stack (shuffles/broadcasts), task slots, temp directories
-
-		// we need this because many configs have been written with a "-1" entry
-		int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-		if (slots == -1) {
-			slots = 1;
-		}
-
-		checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
-			"Number of task slots must be at least one.");
-
-		final int numNetworkBuffers = configuration.getInteger(
-			ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
-
-		checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
-			ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, "");
-
-		final int pageSize = configuration.getInteger(
-			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
-
-		// check page size of for minimum size
-		checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
-			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-			"Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
-
-		// check page size for power of two
-		checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
-			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-			"Memory segment size must be a power of 2.");
-
-		// check whether we use heap or off-heap memory
-		final MemoryType memType;
-		if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) {
-			memType = MemoryType.OFF_HEAP;
-		} else {
-			memType = MemoryType.HEAP;
-		}
-
-		// initialize the memory segment factory accordingly
-		if (memType == MemoryType.HEAP) {
-			if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
-				throw new Exception("Memory type is set to heap memory, but memory segment " +
-					"factory has been initialized for off-heap memory segments");
-			}
-		} else {
-			if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) {
-				throw new Exception("Memory type is set to off-heap memory, but memory segment " +
-					"factory has been initialized for heap memory segments");
-			}
-		}
-
-		final String[] tmpDirs = configuration.getString(
-			ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
-
-		final NettyConfig nettyConfig;
-		if (!localTaskManagerCommunication) {
-			nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(),
-				taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration);
-		} else {
-			nettyConfig = null;
-		}
-
-		// Default spill I/O mode for intermediate results
-		final String syncOrAsync = configuration.getString(
-			ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
-			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
-
-		final IOManager.IOMode ioMode;
-		if (syncOrAsync.equals("async")) {
-			ioMode = IOManager.IOMode.ASYNC;
-		} else {
-			ioMode = IOManager.IOMode.SYNC;
-		}
-
-		final int queryServerPort =  configuration.getInteger(
-			ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
-			ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT);
-
-		final int queryServerNetworkThreads =  configuration.getInteger(
-			ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
-			ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS);
-
-		final int queryServerQueryThreads =  configuration.getInteger(
-			ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
-			ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS);
-
-		final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration(
-			numNetworkBuffers,
-			pageSize,
-			memType,
-			ioMode,
-			queryServerPort,
-			queryServerNetworkThreads,
-			queryServerQueryThreads,
-			Option.apply(nettyConfig),
-			500,
-			3000);
-
-		// ----> timeouts, library caching, profiling
-
-		final FiniteDuration timeout;
-		try {
-			timeout = AkkaUtils.getTimeout(configuration);
-		} catch (Exception e) {
-			throw new IllegalArgumentException(
-				"Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT +
-					"'.Use formats like '50 s' or '1 min' to specify the timeout.");
-		}
-		LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout);
-
-		final long cleanupInterval = configuration.getLong(
-			ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
-			ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
-
-		final FiniteDuration finiteRegistrationDuration;
-		try {
-			Duration maxRegistrationDuration = Duration.create(configuration.getString(
-				ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
-				ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
-			if (maxRegistrationDuration.isFinite()) {
-				finiteRegistrationDuration = new FiniteDuration(maxRegistrationDuration.toSeconds(), TimeUnit.SECONDS);
-			} else {
-				finiteRegistrationDuration = null;
-			}
-		} catch (NumberFormatException e) {
-			throw new IllegalArgumentException("Invalid format for parameter " +
-				ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e);
-		}
-
-		final FiniteDuration initialRegistrationPause;
-		try {
-			Duration pause = Duration.create(configuration.getString(
-				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
-				ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
-			if (pause.isFinite()) {
-				initialRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
-			} else {
-				throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);
-			}
-		} catch (NumberFormatException e) {
-			throw new IllegalArgumentException("Invalid format for parameter " +
-				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
-		}
-
-		final FiniteDuration maxRegistrationPause;
-		try {
-			Duration pause = Duration.create(configuration.getString(
-				ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
-				ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
-			if (pause.isFinite()) {
-				maxRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
-			} else {
-				throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);
-			}
-		} catch (NumberFormatException e) {
-			throw new IllegalArgumentException("Invalid format for parameter " +
-				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
-		}
-
-		final FiniteDuration refusedRegistrationPause;
-		try {
-			Duration pause = Duration.create(configuration.getString(
-				ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
-				ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
-			if (pause.isFinite()) {
-				refusedRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
-			} else {
-				throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);
-			}
-		} catch (NumberFormatException e) {
-			throw new IllegalArgumentException("Invalid format for parameter " +
-				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
-		}
-
-		return new TaskExecutorConfiguration(
-			tmpDirs,
-			cleanupInterval,
-			networkConfig,
-			timeout,
-			finiteRegistrationDuration,
-			slots,
-			configuration,
-			initialRegistrationPause,
-			maxRegistrationPause,
-			refusedRegistrationPause);
-	}
-
-	/**
-	 * Validates a condition for a config parameter and displays a standard exception, if the
-	 * the condition does not hold.
-	 *
-	 * @param condition             The condition that must hold. If the condition is false, an exception is thrown.
-	 * @param parameter         The parameter value. Will be shown in the exception message.
-	 * @param name              The name of the config parameter. Will be shown in the exception message.
-	 * @param errorMessage  The optional custom error message to append to the exception message.
-	 */
-	private static void checkConfigParameter(
-		boolean condition,
-		Object parameter,
-		String name,
-		String errorMessage) {
-		if (!condition) {
-			throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage);
-		}
-	}
-
-	/**
-	 * Validates that all the directories denoted by the strings do actually exist, are proper
-	 * directories (not files), and are writable.
-	 *
-	 * @param tmpDirs       The array of directory paths to check.
-	 * @throws Exception    Thrown if any of the directories does not exist or is not writable
-	 *                   or is a file, rather than a directory.
-	 */
-	private static void checkTempDirs(String[] tmpDirs) throws IOException {
-		for (String dir : tmpDirs) {
-			if (dir != null && !dir.equals("")) {
-				File file = new File(dir);
-				if (!file.exists()) {
-					throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist.");
-				}
-				if (!file.isDirectory()) {
-					throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory.");
-				}
-				if (!file.canWrite()) {
-					throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable.");
-				}
-
-				if (LOG.isInfoEnabled()) {
-					long totalSpaceGb = file.getTotalSpace() >> 30;
-					long usableSpaceGb = file.getUsableSpace() >> 30;
-					double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100;
-					String path = file.getAbsolutePath();
-					LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)",
-						path, totalSpaceGb, usableSpaceGb, usablePercentage));
-				}
-			} else {
-				throw new IllegalArgumentException("Temporary file directory #$id is null.");
-			}
-		}
-	}
-
-	private static class TaskManagerComponents {
-		private final TaskManagerLocation taskManagerLocation;
-		private final MemoryManager memoryManager;
-		private final IOManager ioManager;
-		private final NetworkEnvironment networkEnvironment;
-
-		private TaskManagerComponents(
-			TaskManagerLocation taskManagerLocation,
-			MemoryManager memoryManager,
-			IOManager ioManager,
-			NetworkEnvironment networkEnvironment) {
-
-			this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
-			this.memoryManager = Preconditions.checkNotNull(memoryManager);
-			this.ioManager = Preconditions.checkNotNull(ioManager);
-			this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
-		}
-
-		public MemoryManager getMemoryManager() {
-			return memoryManager;
-		}
-
-		public IOManager getIOManager() {
-			return ioManager;
-		}
-
-		public NetworkEnvironment getNetworkEnvironment() {
-			return networkEnvironment;
-		}
-
-		public TaskManagerLocation getTaskManagerLocation() {
-			return taskManagerLocation;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index b6d9306..42655a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
@@ -170,6 +171,12 @@ public class LeaderRetrievalUtils {
 	}
 
 	public static InetAddress findConnectingAddress(
+		LeaderRetrievalService leaderRetrievalService,
+		Time timeout) throws LeaderRetrievalException {
+		return findConnectingAddress(leaderRetrievalService, new FiniteDuration(timeout.getSize(), timeout.getUnit()));
+	}
+
+	public static InetAddress findConnectingAddress(
 			LeaderRetrievalService leaderRetrievalService,
 			FiniteDuration timeout) throws LeaderRetrievalException {
 		ConnectionUtils.LeaderConnectingAddressListener listener = new ConnectionUtils.LeaderConnectingAddressListener();

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index bd3af33..84f5ac7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -94,6 +94,10 @@ object AkkaUtils {
     createActorSystem(getDefaultAkkaConfig)
   }
 
+  def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): Config = {
+    getAkkaConfig(configuration, if (hostname == null) Some((hostname, port)) else None)
+  }
+
   /**
    * Creates an akka config with the provided configuration values. If the listening address is
    * specified, then the actor system will listen on the respective address.

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 893eaa8..97aae34 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -30,6 +30,6 @@ case class NetworkEnvironmentConfiguration(
   queryServerPort: Int,
   queryServerNetworkThreads: Int,
   queryServerQueryThreads: Int,
-  nettyConfig: Option[NettyConfig] = None,
+  nettyConfig: NettyConfig = null,
   partitionRequestInitialBackoff: Int = 500,
   partitinRequestMaxBackoff: Int = 3000)

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index af2b38f..79670a4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1932,7 +1932,7 @@ object TaskManager {
       netConfig.networkBufferSize,
       netConfig.memoryType)
 
-    val connectionManager = netConfig.nettyConfig match {
+    val connectionManager = Option(netConfig.nettyConfig) match {
       case Some(nettyConfig) => new NettyConnectionManager(nettyConfig)
       case None => new LocalConnectionManager()
     }
@@ -1942,7 +1942,7 @@ object TaskManager {
 
     val kvStateRegistry = new KvStateRegistry()
 
-    val kvStateServer = netConfig.nettyConfig match {
+    val kvStateServer = Option(netConfig.nettyConfig) match {
       case Some(nettyConfig) =>
 
         val numNetworkThreads = if (netConfig.queryServerNetworkThreads == 0) {
@@ -2274,7 +2274,7 @@ object TaskManager {
       queryServerPort,
       queryServerNetworkThreads,
       queryServerQueryThreads,
-      nettyConfig)
+      nettyConfig.getOrElse(null))
 
     // ----> timeouts, library caching, profiling
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index a9ad75d..cc50b66 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -38,7 +37,6 @@ import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
-import scala.Some;
 import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise;
 
@@ -75,7 +73,7 @@ public class NetworkEnvironmentTest {
 			0,
 			0,
 			0,
-			Some.<NettyConfig>empty(),
+			null,
 			0,
 			0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 5b8e6e6..2a004c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -30,7 +30,6 @@ import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
-import java.net.InetAddress;
 import java.util.BitSet;
 import java.util.UUID;
 import java.util.concurrent.Callable;

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 26218dd..9c1f288 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -26,8 +26,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
@@ -51,8 +52,8 @@ public class TaskExecutorTest extends TestLogger {
 		try {
 			// register a mock resource manager gateway
 			ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
-			TaskExecutorConfiguration taskExecutorConfiguration = mock(TaskExecutorConfiguration.class);
-			PowerMockito.when(taskExecutorConfiguration.getNumberOfSlots()).thenReturn(1);
+			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
+			PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
 			rpc.registerGateway(resourceManagerAddress, rmGateway);
 
 			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
@@ -61,12 +62,15 @@ public class TaskExecutorTest extends TestLogger {
 			NonHaServices haServices = new NonHaServices(resourceManagerAddress);
 
 			TaskExecutor taskManager = new TaskExecutor(
-				taskExecutorConfiguration,
+				taskManagerServicesConfiguration,
 				taskManagerLocation,
-				rpc, mock(MemoryManager.class),
+				rpc,
+				mock(MemoryManager.class),
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
-				haServices);
+				haServices,
+				mock(MetricRegistry.class),
+				mock(FatalErrorHandler.class));
 
 			taskManager.start();
 			String taskManagerAddress = taskManager.getAddress();
@@ -101,19 +105,22 @@ public class TaskExecutorTest extends TestLogger {
 			TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
 			haServices.setResourceManagerLeaderRetriever(testLeaderService);
 
-			TaskExecutorConfiguration taskExecutorConfiguration = mock(TaskExecutorConfiguration.class);
-			PowerMockito.when(taskExecutorConfiguration.getNumberOfSlots()).thenReturn(1);
+			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
+			PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
 
 			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
 			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
 
 			TaskExecutor taskManager = new TaskExecutor(
-				taskExecutorConfiguration,
+				taskManagerServicesConfiguration,
 				taskManagerLocation,
-				rpc, mock(MemoryManager.class),
+				rpc,
+				mock(MemoryManager.class),
 				mock(IOManager.class),
 				mock(NetworkEnvironment.class),
-				haServices);
+				haServices,
+				mock(MetricRegistry.class),
+				mock(FatalErrorHandler.class));
 
 			taskManager.start();
 			String taskManagerAddress = taskManager.getAddress();

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 1f93e9b..627a25a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -39,7 +39,6 @@ import org.apache.flink.runtime.io.network.LocalConnectionManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
@@ -105,7 +104,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 
 			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
 					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, 0,
-					Option.<NettyConfig>empty(), 0, 0);
+					null, 0, 0);
 
 			ResourceID taskManagerId = ResourceID.generate();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
index acfbbfd..c0d0455 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
@@ -29,7 +29,6 @@ import org.junit.Test;
 import scala.Tuple2;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.lang.reflect.Field;