You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/01 08:40:34 UTC
[07/50] [abbrv] flink git commit: [hotfix] Replace
TaskManager.createTaskManagerComponents by TaskManagerServices
[hotfix] Replace TaskManager.createTaskManagerComponents by TaskManagerServices
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e653397
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e653397
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e653397
Branch: refs/heads/flip-6
Commit: 8e653397a931497026c8ae8be05b48a108a58edf
Parents: 5219b40
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Sep 28 14:04:54 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 1 09:39:28 2016 +0100
----------------------------------------------------------------------
.../clusterframework/MesosTaskManager.scala | 3 +-
.../taskexecutor/TaskManagerConfiguration.java | 25 +-
.../TaskManagerServicesConfiguration.java | 2 +-
.../minicluster/LocalFlinkMiniCluster.scala | 47 +-
.../flink/runtime/taskmanager/TaskManager.scala | 601 ++-----------------
.../taskmanager/TaskManagerConfiguration.scala | 56 --
...askManagerComponentsStartupShutdownTest.java | 24 +-
.../testingUtils/TestingTaskManager.scala | 3 +-
.../runtime/testingUtils/TestingUtils.scala | 1 -
.../flink/yarn/TestingYarnTaskManager.scala | 3 +-
.../org/apache/flink/yarn/YarnTaskManager.scala | 3 +-
11 files changed, 126 insertions(+), 642 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
index 3972a57..e8d6a58 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
@@ -24,7 +24,8 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.metrics.MetricRegistry
-import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
/** An extension of the TaskManager that listens for additional Mesos-related
* messages.
http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 32eb8c1..f58af77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -41,6 +41,7 @@ public class TaskManagerConfiguration {
private final String[] tmpDirPaths;
private final Time timeout;
+ // null indicates an infinite duration
private final Time maxRegistrationDuration;
private final Time initialRegistrationPause;
private final Time maxRegistrationPause;
@@ -48,6 +49,9 @@ public class TaskManagerConfiguration {
private final long cleanupInterval;
+ // TODO: remove necessity for complete configuration object
+ private final Configuration configuration;
+
public TaskManagerConfiguration(
int numberSlots,
String[] tmpDirPaths,
@@ -56,16 +60,18 @@ public class TaskManagerConfiguration {
Time initialRegistrationPause,
Time maxRegistrationPause,
Time refusedRegistrationPause,
- long cleanupInterval) {
+ long cleanupInterval,
+ Configuration configuration) {
this.numberSlots = numberSlots;
this.tmpDirPaths = Preconditions.checkNotNull(tmpDirPaths);
this.timeout = Preconditions.checkNotNull(timeout);
- this.maxRegistrationDuration = Preconditions.checkNotNull(maxRegistrationDuration);
+ this.maxRegistrationDuration = maxRegistrationDuration;
this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause);
this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause);
this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);
this.cleanupInterval = Preconditions.checkNotNull(cleanupInterval);
+ this.configuration = Preconditions.checkNotNull(configuration);
}
public int getNumberSlots() {
@@ -100,6 +106,10 @@ public class TaskManagerConfiguration {
return cleanupInterval;
}
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
// --------------------------------------------------------------------------------------------
// Static factory methods
// --------------------------------------------------------------------------------------------
@@ -138,7 +148,7 @@ public class TaskManagerConfiguration {
ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
if (maxRegistrationDuration.isFinite()) {
- finiteRegistrationDuration = Time.seconds(maxRegistrationDuration.toSeconds());
+ finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis());
} else {
finiteRegistrationDuration = null;
}
@@ -153,7 +163,7 @@ public class TaskManagerConfiguration {
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
if (pause.isFinite()) {
- initialRegistrationPause = Time.seconds(pause.toSeconds());
+ initialRegistrationPause = Time.milliseconds(pause.toMillis());
} else {
throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);
}
@@ -168,7 +178,7 @@ public class TaskManagerConfiguration {
ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
if (pause.isFinite()) {
- maxRegistrationPause = Time.seconds(pause.toSeconds());
+ maxRegistrationPause = Time.milliseconds(pause.toMillis());
} else {
throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);
}
@@ -183,7 +193,7 @@ public class TaskManagerConfiguration {
ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
if (pause.isFinite()) {
- refusedRegistrationPause = Time.seconds(pause.toSeconds());
+ refusedRegistrationPause = Time.milliseconds(pause.toMillis());
} else {
throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);
}
@@ -200,6 +210,7 @@ public class TaskManagerConfiguration {
initialRegistrationPause,
maxRegistrationPause,
refusedRegistrationPause,
- cleanupInterval);
+ cleanupInterval,
+ configuration);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 66d969a..80dfc09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -208,7 +208,7 @@ public class TaskManagerServicesConfiguration {
int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
- checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+ checkConfigParameter(dataport >= 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
"Leave config parameter empty or use 0 to let the system choose a port automatically.");
checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index cad2648..eac0a51 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.minicluster
+import java.net.InetAddress
import java.util.concurrent.ExecutorService
import akka.actor.{ActorRef, ActorSystem, Props}
@@ -42,8 +43,9 @@ import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.messages.JobManagerMessages
import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse}
import org.apache.flink.runtime.metrics.MetricRegistry
-import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
-import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
+import org.apache.flink.runtime.util.{EnvironmentInformation, LeaderRetrievalUtils}
import scala.concurrent.Await
import scala.concurrent.duration.FiniteDuration
@@ -195,31 +197,32 @@ class LocalFlinkMiniCluster(
val resourceID = ResourceID.generate() // generate random resource id
- val (taskManagerConfig,
- taskManagerLocation,
- memoryManager,
- ioManager,
- network,
- leaderRetrievalService,
- metricsRegistry) = TaskManager.createTaskManagerComponents(
+ val taskManagerAddress = InetAddress.getByName(hostname)
+
+ val taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(config)
+ val taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration(
config,
- resourceID,
- hostname, // network interface to bind to
- localExecution, // start network stack?
- Some(createLeaderRetrievalService()))
+ taskManagerAddress,
+ localExecution)
+
+ val taskManagerServices = TaskManagerServices.fromConfiguration(
+ taskManagerServicesConfiguration,
+ resourceID)
+
+ val metricRegistry = taskManagerServices.getMetricRegistry()
val props = getTaskManagerProps(
taskManagerClass,
- taskManagerConfig,
+ taskManagerConfiguration,
resourceID,
- taskManagerLocation,
- memoryManager,
- ioManager,
- network,
- leaderRetrievalService,
- metricsRegistry)
-
- metricsRegistry.startQueryService(system, resourceID)
+ taskManagerServices.getTaskManagerLocation(),
+ taskManagerServices.getMemoryManager(),
+ taskManagerServices.getIOManager(),
+ taskManagerServices.getNetworkEnvironment,
+ createLeaderRetrievalService(),
+ metricRegistry)
+
+ metricRegistry.startQueryService(system, resourceID)
system.actorOf(props, taskManagerActorName)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index c789156..0701f3a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -31,7 +31,6 @@ import grizzled.slf4j.Logger
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.flink.configuration._
import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType}
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.clusterframework.messages.StopCluster
import org.apache.flink.runtime.clusterframework.types.ResourceID
@@ -44,12 +43,8 @@ import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager,
import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, PartitionInfo}
import org.apache.flink.runtime.filecache.FileCache
import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, InstanceID}
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
-import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool
-import org.apache.flink.runtime.io.network.{LocalConnectionManager, NetworkEnvironment, TaskEventDispatcher}
-import org.apache.flink.runtime.io.network.netty.{NettyConfig, NettyConnectionManager, PartitionStateChecker}
-import org.apache.flink.runtime.io.network.partition.{ResultPartitionConsumableNotifier, ResultPartitionManager}
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.messages.{Acknowledge, StackTraceSampleResponse}
@@ -59,17 +54,16 @@ import org.apache.flink.runtime.messages.StackTraceSampleMessages.{SampleTaskSta
import org.apache.flink.runtime.messages.TaskManagerMessages._
import org.apache.flink.runtime.messages.TaskMessages._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint}
-import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
import org.apache.flink.runtime.metrics.util.MetricUtils
import org.apache.flink.runtime.process.ProcessReaper
-import org.apache.flink.runtime.query.KvStateRegistry
-import org.apache.flink.runtime.query.netty.{DisabledKvStateRequestStats, KvStateServer}
import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration}
import org.apache.flink.runtime.security.SecurityContext
+import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
import org.apache.flink.runtime.util._
import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
-import org.apache.flink.util.{MathUtils, NetUtils}
+import org.apache.flink.util.NetUtils
import scala.collection.JavaConverters._
import scala.concurrent._
@@ -136,7 +130,7 @@ class TaskManager(
override val log = Logger(getClass)
/** The timeout for all actor ask futures */
- protected val askTimeout = new Timeout(config.timeout)
+ protected val askTimeout = new Timeout(config.getTimeout().getSize, config.getTimeout().getUnit())
/** The TaskManager's physical execution resources */
protected val resources = HardwareDescription.extractFromSystem(memoryManager.getMemorySize())
@@ -148,7 +142,7 @@ class TaskManager(
protected val bcVarManager = new BroadcastVariableManager()
/** Handler for distributed files cached by this TaskManager */
- protected val fileCache = new FileCache(config.configuration)
+ protected val fileCache = new FileCache(config.getConfiguration())
private var taskManagerMetricGroup : TaskManagerMetricGroup = _
@@ -172,8 +166,8 @@ class TaskManager(
private val runtimeInfo = new TaskManagerRuntimeInfo(
location.getHostname(),
- new UnmodifiableConfiguration(config.configuration),
- config.tmpDirPaths)
+ new UnmodifiableConfiguration(config.getConfiguration()),
+ config.getTmpDirPaths())
private var scheduledTaskManagerRegistration: Option[Cancellable] = None
private var currentRegistrationRun: UUID = UUID.randomUUID()
@@ -587,7 +581,9 @@ class TaskManager(
)
// the next timeout computes via exponential backoff with cap
- val nextTimeout = (timeout * 2).min(config.maxRegistrationPause)
+ val nextTimeout = (timeout * 2).min(new FiniteDuration(
+ config.getMaxRegistrationPause().toMilliseconds,
+ TimeUnit.MILLISECONDS))
// schedule (with our timeout s delay) a check triggers a new registration
// attempt, if we are not registered by then
@@ -661,10 +657,14 @@ class TaskManager(
if(jobManagerAkkaURL.isDefined) {
// try the registration again after some time
- val delay: FiniteDuration = config.refusedRegistrationPause
- val deadline: Option[Deadline] = config.maxRegistrationDuration.map {
- timeout => timeout + delay fromNow
- }
+ val delay: FiniteDuration = new FiniteDuration(
+ config.getRefusedRegistrationPause().getSize(),
+ config.getRefusedRegistrationPause().getUnit())
+ val deadline: Option[Deadline] = Option(config.getMaxRegistrationDuration())
+ .map {
+ duration => new FiniteDuration(duration.getSize(), duration.getUnit()) +
+ delay fromNow
+ }
// start a new registration run
currentRegistrationRun = UUID.randomUUID()
@@ -676,7 +676,9 @@ class TaskManager(
self ! decorateMessage(
TriggerTaskManagerRegistration(
jobManagerAkkaURL.get,
- config.initialRegistrationPause,
+ new FiniteDuration(
+ config.getInitialRegistrationPause().getSize(),
+ config.getInitialRegistrationPause().getUnit()),
deadline,
1,
currentRegistrationRun)
@@ -817,7 +819,7 @@ class TaskManager(
requestType: LogTypeRequest,
jobManager: ActorRef)
: Unit = {
- val logFilePathOption = Option(config.configuration.getString(
+ val logFilePathOption = Option(config.getConfiguration().getString(
ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file")));
logFilePathOption match {
case None => throw new IOException("TaskManager log files are unavailable. " +
@@ -950,9 +952,10 @@ class TaskManager(
log.info(s"Determined BLOB server address to be $address. Starting BLOB cache.")
try {
- val blobcache = new BlobCache(address, config.configuration)
+ val blobcache = new BlobCache(address, config.getConfiguration())
blobService = Option(blobcache)
- libraryCacheManager = Some(new BlobLibraryCacheManager(blobcache, config.cleanupInterval))
+ libraryCacheManager = Some(
+ new BlobLibraryCacheManager(blobcache, config.getCleanupInterval()))
}
catch {
case e: Exception =>
@@ -1134,7 +1137,9 @@ class TaskManager(
tdd.getJobID,
tdd.getVertexID,
tdd.getExecutionId,
- config.timeout)
+ new FiniteDuration(
+ config.getTimeout().getSize(),
+ config.getTimeout().getUnit()))
val task = new Task(
tdd,
@@ -1408,7 +1413,8 @@ class TaskManager(
def triggerTaskManagerRegistration(): Unit = {
if(jobManagerAkkaURL.isDefined) {
// begin attempts to reconnect
- val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow)
+ val deadline: Option[Deadline] = Option(config.getMaxRegistrationDuration())
+ .map{ duration => new FiniteDuration(duration.getSize(), duration.getUnit()).fromNow }
// start a new registration run
currentRegistrationRun = UUID.randomUUID()
@@ -1418,7 +1424,9 @@ class TaskManager(
self ! decorateMessage(
TriggerTaskManagerRegistration(
jobManagerAkkaURL.get,
- config.initialRegistrationPause,
+ new FiniteDuration(
+ config.getInitialRegistrationPause().getSize(),
+ config.getInitialRegistrationPause().getUnit()),
deadline,
1,
currentRegistrationRun)
@@ -1825,32 +1833,37 @@ object TaskManager {
taskManagerClass: Class[_ <: TaskManager])
: ActorRef = {
- val (taskManagerConfig,
- connectionInfo,
- memoryManager,
- ioManager,
- network,
- leaderRetrievalService,
- metricsRegistry) = createTaskManagerComponents(
- configuration,
- resourceID,
- taskManagerHostname,
- localTaskManagerCommunication,
- leaderRetrievalServiceOption)
+ val taskManagerAddress = InetAddress.getByName(taskManagerHostname)
+
+ val taskManagerServicesConfiguration = TaskManagerServicesConfiguration
+ .fromConfiguration(configuration, taskManagerAddress, false)
+
+ val taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration)
+
+ val taskManagerServices = TaskManagerServices.fromConfiguration(
+ taskManagerServicesConfiguration,
+ resourceID)
+
+ val metricRegistry = taskManagerServices.getMetricRegistry()
+
+ val leaderRetrievalService = leaderRetrievalServiceOption match {
+ case Some(lrs) => lrs
+ case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
+ }
// create the actor properties (which define the actor constructor parameters)
val tmProps = getTaskManagerProps(
taskManagerClass,
- taskManagerConfig,
+ taskManagerConfiguration,
resourceID,
- connectionInfo,
- memoryManager,
- ioManager,
- network,
+ taskManagerServices.getTaskManagerLocation(),
+ taskManagerServices.getMemoryManager(),
+ taskManagerServices.getIOManager(),
+ taskManagerServices.getNetworkEnvironment(),
leaderRetrievalService,
- metricsRegistry)
+ metricRegistry)
- metricsRegistry.startQueryService(actorSystem, resourceID)
+ metricRegistry.startQueryService(actorSystem, resourceID)
taskManagerActorName match {
case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
@@ -1877,211 +1890,11 @@ object TaskManager {
memoryManager,
ioManager,
networkEnvironment,
- taskManagerConfig.numberOfSlots,
+ taskManagerConfig.getNumberSlots(),
leaderRetrievalService,
metricsRegistry)
}
- def createTaskManagerComponents(
- configuration: Configuration,
- resourceID: ResourceID,
- taskManagerHostname: String,
- localTaskManagerCommunication: Boolean,
- leaderRetrievalServiceOption: Option[LeaderRetrievalService]):
- (TaskManagerConfiguration,
- TaskManagerLocation,
- MemoryManager,
- IOManager,
- NetworkEnvironment,
- LeaderRetrievalService,
- FlinkMetricRegistry) = {
-
- val (taskManagerConfig : TaskManagerConfiguration,
- netConfig: NetworkEnvironmentConfiguration,
- taskManagerAddress: InetSocketAddress,
- memType: MemoryType
- ) = parseTaskManagerConfiguration(
- configuration,
- taskManagerHostname,
- localTaskManagerCommunication)
-
- // pre-start checks
- checkTempDirs(taskManagerConfig.tmpDirPaths)
-
- val networkBufferPool = new NetworkBufferPool(
- netConfig.numNetworkBuffers,
- netConfig.networkBufferSize,
- netConfig.memoryType)
-
- val connectionManager = Option(netConfig.nettyConfig) match {
- case Some(nettyConfig) => new NettyConnectionManager(nettyConfig)
- case None => new LocalConnectionManager()
- }
-
- val resultPartitionManager = new ResultPartitionManager()
- val taskEventDispatcher = new TaskEventDispatcher()
-
- val kvStateRegistry = new KvStateRegistry()
-
- val kvStateServer = Option(netConfig.nettyConfig) match {
- case Some(nettyConfig) =>
-
- val numNetworkThreads = if (netConfig.queryServerNetworkThreads == 0) {
- nettyConfig.getNumberOfSlots
- } else {
- netConfig.queryServerNetworkThreads
- }
-
- val numQueryThreads = if (netConfig.queryServerQueryThreads == 0) {
- nettyConfig.getNumberOfSlots
- } else {
- netConfig.queryServerQueryThreads
- }
-
- new KvStateServer(
- taskManagerAddress.getAddress(),
- netConfig.queryServerPort,
- numNetworkThreads,
- numQueryThreads,
- kvStateRegistry,
- new DisabledKvStateRequestStats())
-
- case None => null
- }
-
- // we start the network first, to make sure it can allocate its buffers first
- val network = new NetworkEnvironment(
- networkBufferPool,
- connectionManager,
- resultPartitionManager,
- taskEventDispatcher,
- kvStateRegistry,
- kvStateServer,
- netConfig.ioMode,
- netConfig.partitionRequestInitialBackoff,
- netConfig.partitinRequestMaxBackoff)
-
- network.start()
-
- val taskManagerLocation = new TaskManagerLocation(
- resourceID,
- taskManagerAddress.getAddress(),
- network.getConnectionManager().getDataPort())
-
- // computing the amount of memory to use depends on how much memory is available
- // it strictly needs to happen AFTER the network stack has been initialized
-
- // check if a value has been configured
- val configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
- checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
- ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
- "MemoryManager needs at least one MB of memory. " +
- "If you leave this config parameter empty, the system automatically " +
- "pick a fraction of the available memory.")
-
-
- val preAllocateMemory = configuration.getBoolean(
- ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE)
-
- val memorySize = if (configuredMemory > 0) {
- if (preAllocateMemory) {
- LOG.info(s"Using $configuredMemory MB for managed memory.")
- } else {
- LOG.info(s"Limiting managed memory to $configuredMemory MB, " +
- s"memory will be allocated lazily.")
- }
- configuredMemory << 20 // megabytes to bytes
- }
- else {
- val fraction = configuration.getFloat(
- ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
- ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
- checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction,
- ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
- "MemoryManager fraction of the free memory must be between 0.0 and 1.0")
-
- if (memType == MemoryType.HEAP) {
- val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
- fraction).toLong
-
- if (preAllocateMemory) {
- LOG.info(s"Using $fraction of the currently free heap space for managed " +
- s"heap memory (${relativeMemSize >> 20} MB).")
- } else {
- LOG.info(s"Limiting managed memory to $fraction of the currently free heap space " +
- s"(${relativeMemSize >> 20} MB), memory will be allocated lazily.")
- }
-
- relativeMemSize
- }
- else if (memType == MemoryType.OFF_HEAP) {
-
- // The maximum heap memory has been adjusted according to the fraction
- val maxMemory = EnvironmentInformation.getMaxJvmHeapMemory()
- val directMemorySize = (maxMemory / (1.0 - fraction) * fraction).toLong
-
- if (preAllocateMemory) {
- LOG.info(s"Using $fraction of the maximum memory size for " +
- s"managed off-heap memory (${directMemorySize >> 20} MB).")
- } else {
- LOG.info(s"Limiting managed memory to $fraction of the maximum memory size " +
- s"(${directMemorySize >> 20} MB), memory will be allocated lazily.")
- }
-
- directMemorySize
- }
- else {
- throw new RuntimeException("No supported memory type detected.")
- }
- }
-
- // now start the memory manager
- val memoryManager = try {
- new MemoryManager(
- memorySize,
- taskManagerConfig.numberOfSlots,
- netConfig.networkBufferSize,
- memType,
- preAllocateMemory)
- }
- catch {
- case e: OutOfMemoryError =>
- memType match {
- case MemoryType.HEAP =>
- throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
- s" while allocating the TaskManager heap memory ($memorySize bytes).", e)
-
- case MemoryType.OFF_HEAP =>
- throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
- s" while allocating the TaskManager off-heap memory ($memorySize bytes). " +
- s"Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e)
-
- case _ => throw e
- }
- }
-
- // start the I/O manager last, it will create some temp directories.
- val ioManager: IOManager = new IOManagerAsync(taskManagerConfig.tmpDirPaths)
-
- val leaderRetrievalService = leaderRetrievalServiceOption match {
- case Some(lrs) => lrs
- case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
- }
-
- val metricsRegistry = new FlinkMetricRegistry(
- MetricRegistryConfiguration.fromConfiguration(configuration))
-
- (taskManagerConfig,
- taskManagerLocation,
- memoryManager,
- ioManager,
- network,
- leaderRetrievalService,
- metricsRegistry)
- }
-
-
// --------------------------------------------------------------------------
// Resolving the TaskManager actor
// --------------------------------------------------------------------------
@@ -2121,239 +1934,6 @@ object TaskManager {
// --------------------------------------------------------------------------
/**
- * Utility method to extract TaskManager config parameters from the configuration and to
- * sanity check them.
- *
- * @param configuration The configuration.
- * @param taskManagerHostname The host name under which the TaskManager communicates.
- * @param localTaskManagerCommunication True, to skip initializing the network stack.
- * Use only in cases where only one task manager runs.
- * @return A tuple (TaskManagerConfiguration, network configuration, inet socket address,
- * memory tyep).
- */
- @throws(classOf[IllegalArgumentException])
- def parseTaskManagerConfiguration(
- configuration: Configuration,
- taskManagerHostname: String,
- localTaskManagerCommunication: Boolean)
- : (TaskManagerConfiguration,
- NetworkEnvironmentConfiguration,
- InetSocketAddress,
- MemoryType) = {
-
- // ------- read values from the config and check them ---------
- // (a lot of them)
-
- // ----> hosts / ports for communication and data exchange
-
- val dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
-
- checkConfigParameter(dataport >= 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
- "Leave config parameter empty or use 0 to let the system choose a port automatically.")
-
- val taskManagerAddress = InetAddress.getByName(taskManagerHostname)
- val taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport)
-
- // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories
-
- // we need this because many configs have been written with a "-1" entry
- val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) match {
- case -1 => 1
- case x => x
- }
-
- checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
- "Number of task slots must be at least one.")
-
- val numNetworkBuffers = configuration.getInteger(
- ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
-
- checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
- ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)
-
- val pageSize: Int = configuration.getInteger(
- ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
-
- // check page size of for minimum size
- checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
- ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
- "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE)
-
- // check page size for power of two
- checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
- ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
- "Memory segment size must be a power of 2.")
-
- // check whether we use heap or off-heap memory
- val memType: MemoryType =
- if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) {
- MemoryType.OFF_HEAP
- } else {
- MemoryType.HEAP
- }
-
- // initialize the memory segment factory accordingly
- memType match {
- case MemoryType.HEAP =>
- if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
- throw new Exception("Memory type is set to heap memory, but memory segment " +
- "factory has been initialized for off-heap memory segments")
- }
-
- case MemoryType.OFF_HEAP =>
- if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) {
- throw new Exception("Memory type is set to off-heap memory, but memory segment " +
- "factory has been initialized for heap memory segments")
- }
- }
-
- val tmpDirs = configuration.getString(
- ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH)
- .split(",|" + File.pathSeparator)
-
- val nettyConfig = if (localTaskManagerCommunication) {
- None
- } else {
- Some(
- new NettyConfig(
- taskManagerInetSocketAddress.getAddress(),
- taskManagerInetSocketAddress.getPort(),
- pageSize,
- slots,
- configuration)
- )
- }
-
- // Default spill I/O mode for intermediate results
- val syncOrAsync = configuration.getString(
- ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
- ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE)
-
- val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC
-
- val queryServerPort = configuration.getInteger(
- ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
- ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT)
-
- val queryServerNetworkThreads = configuration.getInteger(
- ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
- ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS)
-
- val queryServerQueryThreads = configuration.getInteger(
- ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
- ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS)
-
- val networkConfig = NetworkEnvironmentConfiguration(
- numNetworkBuffers,
- pageSize,
- memType,
- ioMode,
- queryServerPort,
- queryServerNetworkThreads,
- queryServerQueryThreads,
- nettyConfig.getOrElse(null))
-
- // ----> timeouts, library caching, profiling
-
- val timeout = try {
- AkkaUtils.getTimeout(configuration)
- } catch {
- case e: Exception => throw new IllegalArgumentException(
- s"Invalid format for '${ConfigConstants.AKKA_ASK_TIMEOUT}'. " +
- s"Use formats like '50 s' or '1 min' to specify the timeout.")
- }
- LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout)
-
- val cleanupInterval = configuration.getLong(
- ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
- ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
-
- val finiteRegistrationDuration = try {
- val maxRegistrationDuration = Duration(configuration.getString(
- ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
- ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION))
-
- if (maxRegistrationDuration.isFinite()) {
- Some(maxRegistrationDuration.asInstanceOf[FiniteDuration])
- } else {
- None
- }
- } catch {
- case e: NumberFormatException => throw new IllegalArgumentException(
- "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
- e)
- }
-
- val initialRegistrationPause = try {
- val pause = Duration(configuration.getString(
- ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
- ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE
- ))
-
- if (pause.isFinite()) {
- pause.asInstanceOf[FiniteDuration]
- } else {
- throw new IllegalArgumentException(s"The initial registration pause must be finite: $pause")
- }
- } catch {
- case e: NumberFormatException => throw new IllegalArgumentException(
- "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
- e)
- }
-
- val maxRegistrationPause = try {
- val pause = Duration(configuration.getString(
- ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
- ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE
- ))
-
- if (pause.isFinite()) {
- pause.asInstanceOf[FiniteDuration]
- } else {
- throw new IllegalArgumentException(s"The maximum registration pause must be finite: $pause")
- }
- } catch {
- case e: NumberFormatException => throw new IllegalArgumentException(
- "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
- e)
- }
-
- val refusedRegistrationPause = try {
- val pause = Duration(configuration.getString(
- ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
- ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE
- ))
-
- if (pause.isFinite()) {
- pause.asInstanceOf[FiniteDuration]
- } else {
- throw new IllegalArgumentException(s"The refused registration pause must be finite: $pause")
- }
- } catch {
- case e: NumberFormatException => throw new IllegalArgumentException(
- "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
- e)
- }
-
- val taskManagerConfig = TaskManagerConfiguration(
- tmpDirs,
- cleanupInterval,
- timeout,
- finiteRegistrationDuration,
- slots,
- configuration,
- initialRegistrationPause,
- maxRegistrationPause,
- refusedRegistrationPause)
-
- (taskManagerConfig, networkConfig, taskManagerInetSocketAddress, memType)
- }
-
- /**
* Gets the protocol, hostname and port of the JobManager from the configuration. Also checks that
* the hostname is not null and the port non-negative.
*
@@ -2389,67 +1969,6 @@ object TaskManager {
// --------------------------------------------------------------------------
/**
- * Validates a condition for a config parameter and displays a standard exception, if the
- * the condition does not hold.
- *
- * @param condition The condition that must hold. If the condition is false, an
- * exception is thrown.
- * @param parameter The parameter value. Will be shown in the exception message.
- * @param name The name of the config parameter. Will be shown in the exception message.
- * @param errorMessage The optional custom error message to append to the exception message.
- * @throws IllegalConfigurationException Thrown if the condition is violated.
- */
- @throws(classOf[IllegalConfigurationException])
- private def checkConfigParameter(
- condition: Boolean,
- parameter: Any,
- name: String,
- errorMessage: String = "")
- : Unit = {
- if (!condition) {
- throw new IllegalConfigurationException(
- s"Invalid configuration value for '$name' : $parameter - $errorMessage")
- }
- }
-
- /**
- * Validates that all the directories denoted by the strings do actually exist, are proper
- * directories (not files), and are writable.
- *
- * @param tmpDirs The array of directory paths to check.
- * @throws Exception Thrown if any of the directories does not exist or is not writable
- * or is a file, rather than a directory.
- */
- @throws(classOf[IOException])
- private def checkTempDirs(tmpDirs: Array[String]): Unit = {
- tmpDirs.zipWithIndex.foreach {
- case (dir: String, _) =>
- val file = new File(dir)
-
- if (!file.exists) {
- throw new IOException(
- s"Temporary file directory ${file.getAbsolutePath} does not exist.")
- }
- if (!file.isDirectory) {
- throw new IOException(
- s"Temporary file directory ${file.getAbsolutePath} is not a directory.")
- }
- if (!file.canWrite) {
- throw new IOException(
- s"Temporary file directory ${file.getAbsolutePath} is not writable.")
- }
-
- if (LOG.isInfoEnabled) {
- val totalSpaceGb = file.getTotalSpace >> 30
- val usableSpaceGb = file.getUsableSpace >> 30
- val usablePercentage = usableSpaceGb.asInstanceOf[Double] / totalSpaceGb * 100
-
- val path = file.getAbsolutePath
-
- LOG.info(f"Temporary file directory '$path': total $totalSpaceGb GB, " +
- f"usable $usableSpaceGb GB ($usablePercentage%.2f%% usable)")
- }
- case (_, id) => throw new IllegalArgumentException(s"Temporary file directory #$id is null.")
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
deleted file mode 100644
index aab3c5f..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.configuration.Configuration
-
-import scala.concurrent.duration.FiniteDuration
-
-case class TaskManagerConfiguration(
- tmpDirPaths: Array[String],
- cleanupInterval: Long,
- timeout: FiniteDuration,
- maxRegistrationDuration: Option[FiniteDuration],
- numberOfSlots: Int,
- configuration: Configuration,
- initialRegistrationPause: FiniteDuration,
- maxRegistrationPause: FiniteDuration,
- refusedRegistrationPause: FiniteDuration) {
-
- def this(
- tmpDirPaths: Array[String],
- cleanupInterval: Long,
- timeout: FiniteDuration,
- maxRegistrationDuration: Option[FiniteDuration],
- numberOfSlots: Int,
- configuration: Configuration) {
- this (
- tmpDirPaths,
- cleanupInterval,
- timeout,
- maxRegistrationDuration,
- numberOfSlots,
- configuration,
- FiniteDuration(500, TimeUnit.MILLISECONDS),
- FiniteDuration(30, TimeUnit.SECONDS),
- FiniteDuration(10, TimeUnit.SECONDS))
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 627a25a..500d1bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -26,6 +26,7 @@ import akka.actor.Kill;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemoryType;
@@ -49,11 +50,11 @@ import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.junit.Test;
-import scala.Option;
import scala.concurrent.duration.FiniteDuration;
import java.net.InetAddress;
@@ -69,7 +70,7 @@ public class TaskManagerComponentsStartupShutdownTest {
public void testComponentsStartupShutdown() {
final String[] TMP_DIR = new String[] { ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH };
- final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS);
+ final Time timeout = Time.seconds(100);
final int BUFFER_SIZE = 32 * 1024;
Configuration config = new Configuration();
@@ -93,14 +94,19 @@ public class TaskManagerComponentsStartupShutdownTest {
LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager),
StandaloneResourceManager.class);
+ final int numberOfSlots = 1;
+
// create the components for the TaskManager manually
final TaskManagerConfiguration tmConfig = new TaskManagerConfiguration(
- TMP_DIR,
- 1000000,
- timeout,
- Option.<FiniteDuration>empty(),
- 1,
- config);
+ numberOfSlots,
+ TMP_DIR,
+ timeout,
+ null,
+ Time.milliseconds(500),
+ Time.seconds(30),
+ Time.seconds(10),
+ 1000000, // cleanup interval
+ config);
final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, 0,
@@ -125,8 +131,6 @@ public class TaskManagerComponentsStartupShutdownTest {
network.start();
- final int numberOfSlots = 1;
-
LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManager.path().toString());
MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config);
http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 707401b..09dc5ed 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -24,7 +24,8 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.metrics.MetricRegistry
-import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, TaskManagerConfiguration}
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
import scala.language.postfixOps
http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 73fb928..dba8834 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -242,7 +242,6 @@ object TestingUtils {
)
}
-
def createTaskManager(
actorSystem: ActorSystem,
jobManagerURL: String,
http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
index 1010432..0f82faa 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -24,7 +24,8 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.metrics.MetricRegistry
-import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, TaskManagerLocation}
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation
import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
/** [[YarnTaskManager]] implementation which mixes in the [[TestingTaskManagerLike]] mixin.
http://git-wip-us.apache.org/repos/asf/flink/blob/8e653397/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index 2ab9b20..be31085 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -23,8 +23,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
/** An extension of the TaskManager that listens for additional YARN related
* messages.