You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:53 UTC

[48/50] [abbrv] flink git commit: [hotfix] Replace TaskManager.createTaskManagerComponents by TaskManagerServices

[hotfix] Replace TaskManager.createTaskManagerComponents by TaskManagerServices


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/009ba353
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/009ba353
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/009ba353

Branch: refs/heads/flip-6
Commit: 009ba3537eabd846893b6e46dea9540f529e46d3
Parents: eebe2c3
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Sep 28 14:04:54 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:46:57 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/MesosTaskManager.scala     |   3 +-
 .../taskexecutor/TaskManagerConfiguration.java  |  25 +-
 .../TaskManagerServicesConfiguration.java       |   2 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |  47 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 605 ++-----------------
 .../taskmanager/TaskManagerConfiguration.scala  |  56 --
 ...askManagerComponentsStartupShutdownTest.java |  24 +-
 .../testingUtils/TestingTaskManager.scala       |   3 +-
 .../runtime/testingUtils/TestingUtils.scala     |   1 -
 .../flink/yarn/TestingYarnTaskManager.scala     |   3 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |   3 +-
 11 files changed, 126 insertions(+), 646 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/009ba353/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
index 3972a57..e8d6a58 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
@@ -24,7 +24,8 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.MetricRegistry
-import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 
 /** An extension of the TaskManager that listens for additional Mesos-related
   * messages.

http://git-wip-us.apache.org/repos/asf/flink/blob/009ba353/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 32eb8c1..f58af77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -41,6 +41,7 @@ public class TaskManagerConfiguration {
 	private final String[] tmpDirPaths;
 
 	private final Time timeout;
+	// null indicates an infinite duration
 	private final Time maxRegistrationDuration;
 	private final Time initialRegistrationPause;
 	private final Time maxRegistrationPause;
@@ -48,6 +49,9 @@ public class TaskManagerConfiguration {
 
 	private final long cleanupInterval;
 
+	// TODO: remove necessity for complete configuration object
+	private final Configuration configuration;
+
 	public TaskManagerConfiguration(
 		int numberSlots,
 		String[] tmpDirPaths,
@@ -56,16 +60,18 @@ public class TaskManagerConfiguration {
 		Time initialRegistrationPause,
 		Time maxRegistrationPause,
 		Time refusedRegistrationPause,
-		long cleanupInterval) {
+		long cleanupInterval,
+		Configuration configuration) {
 
 		this.numberSlots = numberSlots;
 		this.tmpDirPaths = Preconditions.checkNotNull(tmpDirPaths);
 		this.timeout = Preconditions.checkNotNull(timeout);
-		this.maxRegistrationDuration = Preconditions.checkNotNull(maxRegistrationDuration);
+		this.maxRegistrationDuration = maxRegistrationDuration;
 		this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause);
 		this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause);
 		this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);
 		this.cleanupInterval = Preconditions.checkNotNull(cleanupInterval);
+		this.configuration = Preconditions.checkNotNull(configuration);
 	}
 
 	public int getNumberSlots() {
@@ -100,6 +106,10 @@ public class TaskManagerConfiguration {
 		return cleanupInterval;
 	}
 
+	public Configuration getConfiguration() {
+		return configuration;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Static factory methods
 	// --------------------------------------------------------------------------------------------
@@ -138,7 +148,7 @@ public class TaskManagerConfiguration {
 				ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
 				ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
 			if (maxRegistrationDuration.isFinite()) {
-				finiteRegistrationDuration = Time.seconds(maxRegistrationDuration.toSeconds());
+				finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis());
 			} else {
 				finiteRegistrationDuration = null;
 			}
@@ -153,7 +163,7 @@ public class TaskManagerConfiguration {
 				ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
 				ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
 			if (pause.isFinite()) {
-				initialRegistrationPause = Time.seconds(pause.toSeconds());
+				initialRegistrationPause = Time.milliseconds(pause.toMillis());
 			} else {
 				throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);
 			}
@@ -168,7 +178,7 @@ public class TaskManagerConfiguration {
 				ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
 				ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
 			if (pause.isFinite()) {
-				maxRegistrationPause = Time.seconds(pause.toSeconds());
+				maxRegistrationPause = Time.milliseconds(pause.toMillis());
 			} else {
 				throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);
 			}
@@ -183,7 +193,7 @@ public class TaskManagerConfiguration {
 				ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
 				ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
 			if (pause.isFinite()) {
-				refusedRegistrationPause = Time.seconds(pause.toSeconds());
+				refusedRegistrationPause = Time.milliseconds(pause.toMillis());
 			} else {
 				throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);
 			}
@@ -200,6 +210,7 @@ public class TaskManagerConfiguration {
 			initialRegistrationPause,
 			maxRegistrationPause,
 			refusedRegistrationPause,
-			cleanupInterval);
+			cleanupInterval,
+			configuration);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/009ba353/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 66d969a..80dfc09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -208,7 +208,7 @@ public class TaskManagerServicesConfiguration {
 		int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
 			ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
 
-		checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+		checkConfigParameter(dataport >= 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
 			"Leave config parameter empty or use 0 to let the system choose a port automatically.");
 
 		checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,

http://git-wip-us.apache.org/repos/asf/flink/blob/009ba353/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 27c9dd9..d29f73b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.minicluster
 
+import java.net.InetAddress
 import java.util.concurrent.ExecutorService
 
 import akka.actor.{ActorRef, ActorSystem, Props}
@@ -43,8 +44,9 @@ import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages
 import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse}
 import org.apache.flink.runtime.metrics.MetricRegistry
-import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
-import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
+import org.apache.flink.runtime.util.{EnvironmentInformation, LeaderRetrievalUtils}
 
 import scala.concurrent.Await
 import scala.concurrent.duration.FiniteDuration
@@ -198,31 +200,32 @@ class LocalFlinkMiniCluster(
 
     val resourceID = ResourceID.generate() // generate random resource id
 
-    val (taskManagerConfig,
-    taskManagerLocation,
-    memoryManager,
-    ioManager,
-    network,
-    leaderRetrievalService,
-    metricsRegistry) = TaskManager.createTaskManagerComponents(
+    val taskManagerAddress = InetAddress.getByName(hostname)
+
+    val taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(config)
+    val taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration(
       config,
-      resourceID,
-      hostname, // network interface to bind to
-      localExecution, // start network stack?
-      Some(createLeaderRetrievalService()))
+      taskManagerAddress,
+      localExecution)
+
+    val taskManagerServices = TaskManagerServices.fromConfiguration(
+      taskManagerServicesConfiguration,
+      resourceID)
+
+    val metricRegistry = taskManagerServices.getMetricRegistry()
 
     val props = getTaskManagerProps(
       taskManagerClass,
-      taskManagerConfig,
+      taskManagerConfiguration,
       resourceID,
-      taskManagerLocation,
-      memoryManager,
-      ioManager,
-      network,
-      leaderRetrievalService,
-      metricsRegistry)
-
-    metricsRegistry.startQueryService(system)
+      taskManagerServices.getTaskManagerLocation(),
+      taskManagerServices.getMemoryManager(),
+      taskManagerServices.getIOManager(),
+      taskManagerServices.getNetworkEnvironment,
+      createLeaderRetrievalService(),
+      metricRegistry)
+
+    metricRegistry.startQueryService(system)
 
     system.actorOf(props, taskManagerActorName)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/009ba353/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 79670a4..d16c1b0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -37,7 +37,6 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
 import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType}
 import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
@@ -51,12 +50,8 @@ import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager,
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.filecache.FileCache
 import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, InstanceID}
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
-import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool
-import org.apache.flink.runtime.io.network.{LocalConnectionManager, NetworkEnvironment, TaskEventDispatcher}
-import org.apache.flink.runtime.io.network.netty.{NettyConfig, NettyConnectionManager, PartitionStateChecker}
-import org.apache.flink.runtime.io.network.partition.{ResultPartitionConsumableNotifier, ResultPartitionManager}
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
 import org.apache.flink.runtime.memory.MemoryManager
@@ -66,16 +61,15 @@ import org.apache.flink.runtime.messages.StackTraceSampleMessages.{ResponseStack
 import org.apache.flink.runtime.messages.TaskManagerMessages._
 import org.apache.flink.runtime.messages.TaskMessages._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint}
-import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
-import org.apache.flink.runtime.query.KvStateRegistry
-import org.apache.flink.runtime.query.netty.{DisabledKvStateRequestStats, KvStateServer}
 import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration}
 import org.apache.flink.runtime.security.SecurityContext
+import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
-import org.apache.flink.util.{MathUtils, NetUtils}
+import org.apache.flink.util.NetUtils
 
 import scala.collection.JavaConverters._
 import scala.concurrent._
@@ -142,7 +136,7 @@ class TaskManager(
   override val log = Logger(getClass)
 
   /** The timeout for all actor ask futures */
-  protected val askTimeout = new Timeout(config.timeout)
+  protected val askTimeout = new Timeout(config.getTimeout().getSize, config.getTimeout().getUnit())
 
   /** The TaskManager's physical execution resources */
   protected val resources = HardwareDescription.extractFromSystem(memoryManager.getMemorySize())
@@ -154,7 +148,7 @@ class TaskManager(
   protected val bcVarManager = new BroadcastVariableManager()
 
   /** Handler for distributed files cached by this TaskManager */
-  protected val fileCache = new FileCache(config.configuration)
+  protected val fileCache = new FileCache(config.getConfiguration())
 
   /** Registry of metrics periodically transmitted to the JobManager */
   private val metricRegistry = TaskManager.createMetricsRegistry()
@@ -190,8 +184,8 @@ class TaskManager(
 
   private val runtimeInfo = new TaskManagerRuntimeInfo(
        location.getHostname(),
-       new UnmodifiableConfiguration(config.configuration),
-       config.tmpDirPaths)
+       new UnmodifiableConfiguration(config.getConfiguration()),
+       config.getTmpDirPaths())
 
   private var scheduledTaskManagerRegistration: Option[Cancellable] = None
   private var currentRegistrationRun: UUID = UUID.randomUUID()
@@ -614,7 +608,9 @@ class TaskManager(
             )
 
             // the next timeout computes via exponential backoff with cap
-            val nextTimeout = (timeout * 2).min(config.maxRegistrationPause)
+            val nextTimeout = (timeout * 2).min(new FiniteDuration(
+              config.getMaxRegistrationPause().toMilliseconds,
+              TimeUnit.MILLISECONDS))
 
             // schedule (with our timeout s delay) a check triggers a new registration
             // attempt, if we are not registered by then
@@ -688,10 +684,14 @@ class TaskManager(
 
           if(jobManagerAkkaURL.isDefined) {
             // try the registration again after some time
-            val delay: FiniteDuration = config.refusedRegistrationPause
-            val deadline: Option[Deadline] = config.maxRegistrationDuration.map {
-              timeout => timeout + delay fromNow
-            }
+            val delay: FiniteDuration = new FiniteDuration(
+              config.getRefusedRegistrationPause().getSize(),
+              config.getRefusedRegistrationPause().getUnit())
+            val deadline: Option[Deadline] = Option(config.getMaxRegistrationDuration())
+              .map {
+                duration => new FiniteDuration(duration.getSize(), duration.getUnit()) +
+                  delay fromNow
+              }
 
             // start a new registration run
             currentRegistrationRun = UUID.randomUUID()
@@ -703,7 +703,9 @@ class TaskManager(
                 self ! decorateMessage(
                   TriggerTaskManagerRegistration(
                     jobManagerAkkaURL.get,
-                    config.initialRegistrationPause,
+                    new FiniteDuration(
+                      config.getInitialRegistrationPause().getSize(),
+                      config.getInitialRegistrationPause().getUnit()),
                     deadline,
                     1,
                     currentRegistrationRun)
@@ -842,7 +844,7 @@ class TaskManager(
       requestType: LogTypeRequest,
       jobManager: ActorRef)
     : Unit = {
-    val logFilePathOption = Option(config.configuration.getString(
+    val logFilePathOption = Option(config.getConfiguration().getString(
       ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file")));
     logFilePathOption match {
       case None => throw new IOException("TaskManager log files are unavailable. " +
@@ -975,9 +977,10 @@ class TaskManager(
       log.info(s"Determined BLOB server address to be $address. Starting BLOB cache.")
 
       try {
-        val blobcache = new BlobCache(address, config.configuration)
+        val blobcache = new BlobCache(address, config.getConfiguration())
         blobService = Option(blobcache)
-        libraryCacheManager = Some(new BlobLibraryCacheManager(blobcache, config.cleanupInterval))
+        libraryCacheManager = Some(
+          new BlobLibraryCacheManager(blobcache, config.getCleanupInterval()))
       }
       catch {
         case e: Exception =>
@@ -1160,7 +1163,9 @@ class TaskManager(
         tdd.getJobID,
         tdd.getVertexID,
         tdd.getExecutionId,
-        config.timeout)
+        new FiniteDuration(
+          config.getTimeout().getSize(),
+          config.getTimeout().getUnit()))
 
       val task = new Task(
         tdd,
@@ -1427,7 +1432,8 @@ class TaskManager(
   def triggerTaskManagerRegistration(): Unit = {
     if(jobManagerAkkaURL.isDefined) {
       // begin attempts to reconnect
-      val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow)
+      val deadline: Option[Deadline] = Option(config.getMaxRegistrationDuration())
+        .map{ duration => new FiniteDuration(duration.getSize(), duration.getUnit()).fromNow }
 
       // start a new registration run
       currentRegistrationRun = UUID.randomUUID()
@@ -1437,7 +1443,9 @@ class TaskManager(
       self ! decorateMessage(
         TriggerTaskManagerRegistration(
           jobManagerAkkaURL.get,
-          config.initialRegistrationPause,
+          new FiniteDuration(
+            config.getInitialRegistrationPause().getSize(),
+            config.getInitialRegistrationPause().getUnit()),
           deadline,
           1,
           currentRegistrationRun)
@@ -1844,32 +1852,37 @@ object TaskManager {
       taskManagerClass: Class[_ <: TaskManager])
     : ActorRef = {
 
-    val (taskManagerConfig,
-      connectionInfo,
-      memoryManager,
-      ioManager,
-      network,
-      leaderRetrievalService,
-      metricsRegistry) = createTaskManagerComponents(
-      configuration,
-      resourceID,
-      taskManagerHostname,
-      localTaskManagerCommunication,
-      leaderRetrievalServiceOption)
+    val taskManagerAddress = InetAddress.getByName(taskManagerHostname)
+
+    val taskManagerServicesConfiguration = TaskManagerServicesConfiguration
+      .fromConfiguration(configuration, taskManagerAddress, false)
+
+    val taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration)
+
+    val taskManagerServices = TaskManagerServices.fromConfiguration(
+      taskManagerServicesConfiguration,
+      resourceID)
+
+    val metricRegistry = taskManagerServices.getMetricRegistry()
+
+    val leaderRetrievalService = leaderRetrievalServiceOption match {
+      case Some(lrs) => lrs
+      case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
+    }
 
     // create the actor properties (which define the actor constructor parameters)
     val tmProps = getTaskManagerProps(
       taskManagerClass,
-      taskManagerConfig,
+      taskManagerConfiguration,
       resourceID,
-      connectionInfo,
-      memoryManager,
-      ioManager,
-      network,
+      taskManagerServices.getTaskManagerLocation(),
+      taskManagerServices.getMemoryManager(),
+      taskManagerServices.getIOManager(),
+      taskManagerServices.getNetworkEnvironment(),
       leaderRetrievalService,
-      metricsRegistry)
+      metricRegistry)
 
-    metricsRegistry.startQueryService(actorSystem)
+    metricRegistry.startQueryService(actorSystem)
 
     taskManagerActorName match {
       case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
@@ -1896,211 +1909,11 @@ object TaskManager {
       memoryManager,
       ioManager,
       networkEnvironment,
-      taskManagerConfig.numberOfSlots,
+      taskManagerConfig.getNumberSlots(),
       leaderRetrievalService,
       metricsRegistry)
   }
 
-  def createTaskManagerComponents(
-    configuration: Configuration,
-    resourceID: ResourceID,
-    taskManagerHostname: String,
-    localTaskManagerCommunication: Boolean,
-    leaderRetrievalServiceOption: Option[LeaderRetrievalService]):
-      (TaskManagerConfiguration,
-      TaskManagerLocation,
-      MemoryManager,
-      IOManager,
-      NetworkEnvironment,
-      LeaderRetrievalService,
-      FlinkMetricRegistry) = {
-
-    val (taskManagerConfig : TaskManagerConfiguration,
-    netConfig: NetworkEnvironmentConfiguration,
-    taskManagerAddress: InetSocketAddress,
-    memType: MemoryType
-      ) = parseTaskManagerConfiguration(
-      configuration,
-      taskManagerHostname,
-      localTaskManagerCommunication)
-
-    // pre-start checks
-    checkTempDirs(taskManagerConfig.tmpDirPaths)
-
-    val networkBufferPool = new NetworkBufferPool(
-      netConfig.numNetworkBuffers,
-      netConfig.networkBufferSize,
-      netConfig.memoryType)
-
-    val connectionManager = Option(netConfig.nettyConfig) match {
-      case Some(nettyConfig) => new NettyConnectionManager(nettyConfig)
-      case None => new LocalConnectionManager()
-    }
-
-    val resultPartitionManager = new ResultPartitionManager()
-    val taskEventDispatcher = new TaskEventDispatcher()
-
-    val kvStateRegistry = new KvStateRegistry()
-
-    val kvStateServer = Option(netConfig.nettyConfig) match {
-      case Some(nettyConfig) =>
-
-        val numNetworkThreads = if (netConfig.queryServerNetworkThreads == 0) {
-          nettyConfig.getNumberOfSlots
-        } else {
-          netConfig.queryServerNetworkThreads
-        }
-
-        val numQueryThreads = if (netConfig.queryServerQueryThreads == 0) {
-          nettyConfig.getNumberOfSlots
-        } else {
-          netConfig.queryServerQueryThreads
-        }
-
-        new KvStateServer(
-          taskManagerAddress.getAddress(),
-          netConfig.queryServerPort,
-          numNetworkThreads,
-          numQueryThreads,
-          kvStateRegistry,
-          new DisabledKvStateRequestStats())
-
-      case None => null
-    }
-
-    // we start the network first, to make sure it can allocate its buffers first
-    val network = new NetworkEnvironment(
-      networkBufferPool,
-      connectionManager,
-      resultPartitionManager,
-      taskEventDispatcher,
-      kvStateRegistry,
-      kvStateServer,
-      netConfig.ioMode,
-      netConfig.partitionRequestInitialBackoff,
-      netConfig.partitinRequestMaxBackoff)
-
-    network.start()
-
-    val taskManagerLocation = new TaskManagerLocation(
-      resourceID,
-      taskManagerAddress.getAddress(),
-      network.getConnectionManager().getDataPort())
-
-    // computing the amount of memory to use depends on how much memory is available
-    // it strictly needs to happen AFTER the network stack has been initialized
-
-    // check if a value has been configured
-    val configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
-    checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
-                         ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
-                         "MemoryManager needs at least one MB of memory. " +
-                           "If you leave this config parameter empty, the system automatically " +
-                           "pick a fraction of the available memory.")
-
-
-    val preAllocateMemory = configuration.getBoolean(
-      ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE)
-
-    val memorySize = if (configuredMemory > 0) {
-      if (preAllocateMemory) {
-        LOG.info(s"Using $configuredMemory MB for managed memory.")
-      } else {
-        LOG.info(s"Limiting managed memory to $configuredMemory MB, " +
-                   s"memory will be allocated lazily.")
-      }
-      configuredMemory << 20 // megabytes to bytes
-    }
-    else {
-      val fraction = configuration.getFloat(
-        ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-        ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
-      checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction,
-                           ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-                           "MemoryManager fraction of the free memory must be between 0.0 and 1.0")
-
-      if (memType == MemoryType.HEAP) {
-        val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-          fraction).toLong
-
-        if (preAllocateMemory) {
-          LOG.info(s"Using $fraction of the currently free heap space for managed " +
-                     s"heap memory (${relativeMemSize >> 20} MB).")
-        } else {
-          LOG.info(s"Limiting managed memory to $fraction of the currently free heap space " +
-                     s"(${relativeMemSize >> 20} MB), memory will be allocated lazily.")
-        }
-
-        relativeMemSize
-      }
-      else if (memType == MemoryType.OFF_HEAP) {
-
-        // The maximum heap memory has been adjusted according to the fraction
-        val maxMemory = EnvironmentInformation.getMaxJvmHeapMemory()
-        val directMemorySize = (maxMemory / (1.0 - fraction) * fraction).toLong
-
-        if (preAllocateMemory) {
-          LOG.info(s"Using $fraction of the maximum memory size for " +
-                     s"managed off-heap memory (${directMemorySize >> 20} MB).")
-        } else {
-          LOG.info(s"Limiting managed memory to $fraction of the maximum memory size " +
-                     s"(${directMemorySize >> 20} MB), memory will be allocated lazily.")
-        }
-
-        directMemorySize
-      }
-      else {
-        throw new RuntimeException("No supported memory type detected.")
-      }
-    }
-
-    // now start the memory manager
-    val memoryManager = try {
-      new MemoryManager(
-        memorySize,
-        taskManagerConfig.numberOfSlots,
-        netConfig.networkBufferSize,
-        memType,
-        preAllocateMemory)
-    }
-    catch {
-      case e: OutOfMemoryError =>
-        memType match {
-          case MemoryType.HEAP =>
-            throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
-                      s" while allocating the TaskManager heap memory ($memorySize bytes).", e)
-
-          case MemoryType.OFF_HEAP =>
-            throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
-                      s" while allocating the TaskManager off-heap memory ($memorySize bytes). " +
-                      s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e)
-
-          case _ => throw e
-        }
-    }
-
-    // start the I/O manager last, it will create some temp directories.
-    val ioManager: IOManager = new IOManagerAsync(taskManagerConfig.tmpDirPaths)
-
-    val leaderRetrievalService = leaderRetrievalServiceOption match {
-      case Some(lrs) => lrs
-      case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
-    }
-
-    val metricsRegistry = new FlinkMetricRegistry(
-      MetricRegistryConfiguration.fromConfiguration(configuration))
-
-    (taskManagerConfig,
-      taskManagerLocation,
-      memoryManager,
-      ioManager,
-      network,
-      leaderRetrievalService,
-      metricsRegistry)
-  }
-
-
   // --------------------------------------------------------------------------
   //  Resolving the TaskManager actor
   // --------------------------------------------------------------------------
@@ -2140,239 +1953,6 @@ object TaskManager {
   // --------------------------------------------------------------------------
 
   /**
-   * Utility method to extract TaskManager config parameters from the configuration and to
-   * sanity check them.
-   *
-   * @param configuration The configuration.
-   * @param taskManagerHostname The host name under which the TaskManager communicates.
-   * @param localTaskManagerCommunication True, to skip initializing the network stack.
-   *                                      Use only in cases where only one task manager runs.
-   * @return A tuple (TaskManagerConfiguration, network configuration, inet socket address,
-    *         memory tyep).
-   */
-  @throws(classOf[IllegalArgumentException])
-  def parseTaskManagerConfiguration(
-      configuration: Configuration,
-      taskManagerHostname: String,
-      localTaskManagerCommunication: Boolean)
-    : (TaskManagerConfiguration,
-     NetworkEnvironmentConfiguration,
-     InetSocketAddress,
-     MemoryType) = {
-
-    // ------- read values from the config and check them ---------
-    //                      (a lot of them)
-
-    // ----> hosts / ports for communication and data exchange
-
-    val dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
-
-    checkConfigParameter(dataport >= 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-      "Leave config parameter empty or use 0 to let the system choose a port automatically.")
-
-    val taskManagerAddress = InetAddress.getByName(taskManagerHostname)
-    val taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport)
-
-    // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories
-
-    // we need this because many configs have been written with a "-1" entry
-    val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) match {
-      case -1 => 1
-      case x => x
-    }
-
-    checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
-      "Number of task slots must be at least one.")
-
-    val numNetworkBuffers = configuration.getInteger(
-      ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
-
-    checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
-      ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)
-    
-    val pageSize: Int = configuration.getInteger(
-      ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
-
-    // check page size of for minimum size
-    checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
-      ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-      "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE)
-
-    // check page size for power of two
-    checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
-      ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-      "Memory segment size must be a power of 2.")
-    
-    // check whether we use heap or off-heap memory
-    val memType: MemoryType = 
-      if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) {
-        MemoryType.OFF_HEAP
-      } else {
-        MemoryType.HEAP
-      }
-    
-    // initialize the memory segment factory accordingly
-    memType match {
-      case MemoryType.HEAP =>
-        if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
-          throw new Exception("Memory type is set to heap memory, but memory segment " +
-            "factory has been initialized for off-heap memory segments")
-        }
-
-      case MemoryType.OFF_HEAP =>
-        if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) {
-          throw new Exception("Memory type is set to off-heap memory, but memory segment " +
-            "factory has been initialized for heap memory segments")
-        }
-    }
-    
-    val tmpDirs = configuration.getString(
-      ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH)
-    .split(",|" + File.pathSeparator)
-
-    val nettyConfig = if (localTaskManagerCommunication) {
-      None
-    } else {
-      Some(
-        new NettyConfig(
-          taskManagerInetSocketAddress.getAddress(),
-          taskManagerInetSocketAddress.getPort(),
-          pageSize,
-          slots,
-          configuration)
-      )
-    }
-
-    // Default spill I/O mode for intermediate results
-    val syncOrAsync = configuration.getString(
-      ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
-      ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE)
-
-    val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC
-
-    val queryServerPort =  configuration.getInteger(
-      ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
-      ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT)
-
-    val queryServerNetworkThreads =  configuration.getInteger(
-      ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
-      ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS)
-
-    val queryServerQueryThreads =  configuration.getInteger(
-      ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
-      ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS)
-
-    val networkConfig = NetworkEnvironmentConfiguration(
-      numNetworkBuffers,
-      pageSize,
-      memType,
-      ioMode,
-      queryServerPort,
-      queryServerNetworkThreads,
-      queryServerQueryThreads,
-      nettyConfig.getOrElse(null))
-
-    // ----> timeouts, library caching, profiling
-
-    val timeout = try {
-      AkkaUtils.getTimeout(configuration)
-    } catch {
-      case e: Exception => throw new IllegalArgumentException(
-        s"Invalid format for '${ConfigConstants.AKKA_ASK_TIMEOUT}'. " +
-          s"Use formats like '50 s' or '1 min' to specify the timeout.")
-    }
-    LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout)
-
-    val cleanupInterval = configuration.getLong(
-      ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
-      ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
-
-    val finiteRegistrationDuration = try {
-      val maxRegistrationDuration = Duration(configuration.getString(
-        ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
-        ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION))
-
-      if (maxRegistrationDuration.isFinite()) {
-        Some(maxRegistrationDuration.asInstanceOf[FiniteDuration])
-      } else {
-        None
-      }
-    } catch {
-      case e: NumberFormatException => throw new IllegalArgumentException(
-        "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
-        e)
-    }
-
-    val initialRegistrationPause = try {
-      val pause = Duration(configuration.getString(
-        ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
-        ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE
-      ))
-
-      if (pause.isFinite()) {
-        pause.asInstanceOf[FiniteDuration]
-      } else {
-        throw new IllegalArgumentException(s"The initial registration pause must be finite: $pause")
-      }
-    } catch {
-      case e: NumberFormatException => throw new IllegalArgumentException(
-        "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
-        e)
-    }
-
-    val maxRegistrationPause = try {
-      val pause = Duration(configuration.getString(
-        ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
-        ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE
-      ))
-
-      if (pause.isFinite()) {
-        pause.asInstanceOf[FiniteDuration]
-      } else {
-        throw new IllegalArgumentException(s"The maximum registration pause must be finite: $pause")
-      }
-    } catch {
-      case e: NumberFormatException => throw new IllegalArgumentException(
-        "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
-        e)
-    }
-
-    val refusedRegistrationPause = try {
-      val pause = Duration(configuration.getString(
-        ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
-        ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE
-      ))
-
-      if (pause.isFinite()) {
-        pause.asInstanceOf[FiniteDuration]
-      } else {
-        throw new IllegalArgumentException(s"The refused registration pause must be finite: $pause")
-      }
-    } catch {
-      case e: NumberFormatException => throw new IllegalArgumentException(
-        "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
-        e)
-    }
-
-    val taskManagerConfig = TaskManagerConfiguration(
-      tmpDirs,
-      cleanupInterval,
-      timeout,
-      finiteRegistrationDuration,
-      slots,
-      configuration,
-      initialRegistrationPause,
-      maxRegistrationPause,
-      refusedRegistrationPause)
-
-    (taskManagerConfig, networkConfig, taskManagerInetSocketAddress, memType)
-  }
-
-  /**
    * Gets the hostname and port of the JobManager from the configuration. Also checks that
    * the hostname is not null and the port non-negative.
    *
@@ -2406,71 +1986,6 @@ object TaskManager {
   // --------------------------------------------------------------------------
 
   /**
-   * Validates a condition for a config parameter and displays a standard exception, if the
-   * the condition does not hold.
-   *
-   * @param condition The condition that must hold. If the condition is false, an
-   *                  exception is thrown.
-   * @param parameter The parameter value. Will be shown in the exception message.
-   * @param name The name of the config parameter. Will be shown in the exception message.
-   * @param errorMessage The optional custom error message to append to the exception message.
-   * @throws IllegalConfigurationException Thrown if the condition is violated.
-   */
-  @throws(classOf[IllegalConfigurationException])
-  private def checkConfigParameter(
-      condition: Boolean,
-      parameter: Any,
-      name: String,
-      errorMessage: String = "")
-    : Unit = {
-    if (!condition) {
-      throw new IllegalConfigurationException(
-        s"Invalid configuration value for '$name' : $parameter - $errorMessage")
-    }
-  }
-
-  /**
-   * Validates that all the directories denoted by the strings do actually exist, are proper
-   * directories (not files), and are writable.
-   *
-   * @param tmpDirs The array of directory paths to check.
-   * @throws Exception Thrown if any of the directories does not exist or is not writable
-   *                   or is a file, rather than a directory.
-   */
-  @throws(classOf[IOException])
-  private def checkTempDirs(tmpDirs: Array[String]): Unit = {
-    tmpDirs.zipWithIndex.foreach {
-      case (dir: String, _) =>
-        val file = new File(dir)
-
-        if (!file.exists) {
-          throw new IOException(
-            s"Temporary file directory ${file.getAbsolutePath} does not exist.")
-        }
-        if (!file.isDirectory) {
-          throw new IOException(
-            s"Temporary file directory ${file.getAbsolutePath} is not a directory.")
-        }
-        if (!file.canWrite) {
-          throw new IOException(
-            s"Temporary file directory ${file.getAbsolutePath} is not writable.")
-        }
-
-        if (LOG.isInfoEnabled) {
-          val totalSpaceGb = file.getTotalSpace >>  30
-          val usableSpaceGb = file.getUsableSpace >> 30
-          val usablePercentage = usableSpaceGb.asInstanceOf[Double] / totalSpaceGb * 100
-
-          val path = file.getAbsolutePath
-
-          LOG.info(f"Temporary file directory '$path': total $totalSpaceGb GB, " +
-            f"usable $usableSpaceGb GB ($usablePercentage%.2f%% usable)")
-        }
-      case (_, id) => throw new IllegalArgumentException(s"Temporary file directory #$id is null.")
-    }
-  }
-
-  /**
    * Creates the registry of default metrics, including stats about garbage collection, memory
    * usage, and system CPU load.
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/009ba353/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
deleted file mode 100644
index aab3c5f..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.configuration.Configuration
-
-import scala.concurrent.duration.FiniteDuration
-
-case class TaskManagerConfiguration(
-    tmpDirPaths: Array[String],
-    cleanupInterval: Long,
-    timeout: FiniteDuration,
-    maxRegistrationDuration: Option[FiniteDuration],
-    numberOfSlots: Int,
-    configuration: Configuration,
-    initialRegistrationPause: FiniteDuration,
-    maxRegistrationPause: FiniteDuration,
-    refusedRegistrationPause: FiniteDuration) {
-
-  def this(
-      tmpDirPaths: Array[String],
-      cleanupInterval: Long,
-      timeout: FiniteDuration,
-      maxRegistrationDuration: Option[FiniteDuration],
-      numberOfSlots: Int,
-      configuration: Configuration) {
-    this (
-      tmpDirPaths,
-      cleanupInterval,
-      timeout,
-      maxRegistrationDuration,
-      numberOfSlots,
-      configuration,
-      FiniteDuration(500, TimeUnit.MILLISECONDS),
-      FiniteDuration(30, TimeUnit.SECONDS),
-      FiniteDuration(10, TimeUnit.SECONDS))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/009ba353/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 627a25a..500d1bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -26,6 +26,7 @@ import akka.actor.Kill;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
@@ -49,11 +50,11 @@ import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
 import org.junit.Test;
 
-import scala.Option;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.net.InetAddress;
@@ -69,7 +70,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 	public void testComponentsStartupShutdown() {
 
 		final String[] TMP_DIR = new String[] { ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH };
-		final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS);
+		final Time timeout = Time.seconds(100);
 		final int BUFFER_SIZE = 32 * 1024;
 
 		Configuration config = new Configuration();
@@ -93,14 +94,19 @@ public class TaskManagerComponentsStartupShutdownTest {
 				LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager),
 				StandaloneResourceManager.class);
 
+			final int numberOfSlots = 1;
+
 			// create the components for the TaskManager manually
 			final TaskManagerConfiguration tmConfig = new TaskManagerConfiguration(
-					TMP_DIR,
-					1000000,
-					timeout,
-					Option.<FiniteDuration>empty(),
-					1,
-					config);
+				numberOfSlots,
+				TMP_DIR,
+				timeout,
+				null,
+				Time.milliseconds(500),
+				Time.seconds(30),
+				Time.seconds(10),
+				1000000, // cleanup interval
+				config);
 
 			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
 					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, 0,
@@ -125,8 +131,6 @@ public class TaskManagerComponentsStartupShutdownTest {
 
 			network.start();
 
-			final int numberOfSlots = 1;
-
 			LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManager.path().toString());
 
 			MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/009ba353/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 707401b..09dc5ed 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -24,7 +24,8 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.MetricRegistry
-import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, TaskManagerConfiguration}
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 
 import scala.language.postfixOps
 

http://git-wip-us.apache.org/repos/asf/flink/blob/009ba353/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 5628f3c..e878097 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -243,7 +243,6 @@ object TestingUtils {
     )
   }
 
-
   def createTaskManager(
       actorSystem: ActorSystem,
       jobManagerURL: String,

http://git-wip-us.apache.org/repos/asf/flink/blob/009ba353/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
index 1010432..0f82faa 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -24,7 +24,8 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.MetricRegistry
-import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, TaskManagerLocation}
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
 
 /** [[YarnTaskManager]] implementation which mixes in the [[TestingTaskManagerLike]] mixin.

http://git-wip-us.apache.org/repos/asf/flink/blob/009ba353/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index 2ab9b20..be31085 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -23,8 +23,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
 
 /** An extension of the TaskManager that listens for additional YARN related
   * messages.