You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/06 11:49:02 UTC
[42/50] [abbrv] flink git commit: [FLINK-4505] [cluster mngt]
Implement TaskManager component's startup
[FLINK-4505] [cluster mngt] Implement TaskManager component's startup
The TaskManagerRunner now contains the startup logic for the TaskManager's components.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdc3a0a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdc3a0a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdc3a0a7
Branch: refs/heads/flip-6
Commit: bdc3a0a7d55eb2be215b3ef8324a09c9c344c5fb
Parents: d5dff4b
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Authored: Fri Sep 2 18:00:49 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 6 13:38:45 2016 +0200
----------------------------------------------------------------------
.../runtime/taskexecutor/TaskExecutor.java | 766 +------------------
.../runtime/taskmanager/TaskManagerRunner.java | 749 ++++++++++++++++++
.../runtime/taskexecutor/TaskExecutorTest.java | 53 +-
3 files changed, 804 insertions(+), 764 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bdc3a0a7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 9d9ad2a..8ce2780 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -18,74 +18,29 @@
package org.apache.flink.runtime.taskexecutor;
-import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.io.network.ConnectionManager;
-import org.apache.flink.runtime.io.network.LocalConnectionManager;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.Preconditions;
import org.jboss.netty.channel.ChannelException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.memory.HeapMemorySegment;
-import org.apache.flink.core.memory.HybridMemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-import org.apache.flink.runtime.taskmanager.MemoryLogger;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
-import org.apache.flink.util.MathUtils;
-import org.apache.flink.util.NetUtils;
-
-import scala.Tuple2;
-import scala.Option;
-import scala.Some;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -98,12 +53,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class);
- /** The unique resource ID of this TaskExecutor */
- private final ResourceID resourceID;
-
+ /** The connection information of this task manager */
private final TaskManagerLocation taskManagerLocation;
- /** The access to the leader election and metadata storage services */
+ /** The access to the leader election and retrieval services */
private final HighAvailabilityServices haServices;
/** The task manager configuration */
@@ -128,28 +81,26 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
// ------------------------------------------------------------------------
public TaskExecutor(
- TaskExecutorConfiguration taskExecutorConfig,
- ResourceID resourceID,
- TaskManagerLocation taskManagerLocation,
- MemoryManager memoryManager,
- IOManager ioManager,
- NetworkEnvironment networkEnvironment,
- RpcService rpcService,
- HighAvailabilityServices haServices) {
+ TaskExecutorConfiguration taskExecutorConfig,
+ TaskManagerLocation taskManagerLocation,
+ RpcService rpcService,
+ MemoryManager memoryManager,
+ IOManager ioManager,
+ NetworkEnvironment networkEnvironment,
+ HighAvailabilityServices haServices) {
super(rpcService);
checkArgument(taskExecutorConfig.getNumberOfSlots() > 0, "The number of slots has to be larger than 0.");
this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
- this.resourceID = checkNotNull(resourceID);
this.taskManagerLocation = checkNotNull(taskManagerLocation);
this.memoryManager = checkNotNull(memoryManager);
this.ioManager = checkNotNull(ioManager);
this.networkEnvironment = checkNotNull(networkEnvironment);
this.haServices = checkNotNull(haServices);
- this.numberOfSlots = taskExecutorConfig.getNumberOfSlots();
+ this.numberOfSlots = taskExecutorConfig.getNumberOfSlots();
}
// ------------------------------------------------------------------------
@@ -207,7 +158,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
}
- /**
* Requests a slot from the TaskManager
*
* @param allocationID id for the request
@@ -220,126 +170,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
/**
- * Starts and runs the TaskManager.
- * <p/>
- * This method first tries to select the network interface to use for the TaskManager
- * communication. The network interface is used both for the actor communication
- * (coordination) as well as for the data exchange between task managers. Unless
- * the hostname/interface is explicitly configured in the configuration, this
- * method will try out various interfaces and methods to connect to the JobManager
- * and select the one where the connection attempt is successful.
- * <p/>
- * After selecting the network interface, this method brings up an actor system
- * for the TaskManager and its actors, starts the TaskManager's services
- * (library cache, shuffle network stack, ...), and starts the TaskManager itself.
- *
- * @param configuration The configuration for the TaskManager.
- * @param resourceID The id of the resource which the task manager will run on.
- */
- public static void selectNetworkInterfaceAndRunTaskManager(
- Configuration configuration,
- ResourceID resourceID) throws Exception {
-
- final InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration);
-
- runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration);
- }
-
- private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration)
- throws Exception {
- String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
- if (taskManagerHostname != null) {
- LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname);
- } else {
- LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
- FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
-
- InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, lookupTimeout);
- taskManagerHostname = taskManagerAddress.getHostName();
- LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
- taskManagerHostname, taskManagerAddress.getHostAddress());
- }
-
- // if no task manager port has been configured, use 0 (system will pick any free port)
- final int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
- if (actorSystemPort < 0 || actorSystemPort > 65535) {
- throw new IllegalConfigurationException("Invalid value for '" +
- ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
- "' (port for the TaskManager actor system) : " + actorSystemPort +
- " - Leave config parameter empty or use 0 to let the system choose a port automatically.");
- }
-
- return new InetSocketAddress(taskManagerHostname, actorSystemPort);
- }
-
- /**
- * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its
- * actors, starts the TaskManager's services (library cache, shuffle network stack, ...),
- * and starts the TaskManager itself.
- * <p/>
- * This method will also spawn a process reaper for the TaskManager (kill the process if
- * the actor fails) and optionally start the JVM memory logging thread.
- *
- * @param taskManagerHostname The hostname/address of the interface where the actor system
- * will communicate.
- * @param resourceID The id of the resource which the task manager will run on.
- * @param actorSystemPort The port at which the actor system will communicate.
- * @param configuration The configuration for the TaskManager.
- */
- private static void runTaskManager(
- String taskManagerHostname,
- ResourceID resourceID,
- int actorSystemPort,
- final Configuration configuration) throws Exception {
-
- LOG.info("Starting TaskManager");
-
- // Bring up the TaskManager actor system first, bind it to the given address.
-
- LOG.info("Starting TaskManager actor system at " +
- NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort));
-
- final ActorSystem taskManagerSystem;
- try {
- Tuple2<String, Object> address = new Tuple2<String, Object>(taskManagerHostname, actorSystemPort);
- Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new Some<>(address));
- LOG.debug("Using akka configuration\n " + akkaConfig);
- taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig);
- } catch (Throwable t) {
- if (t instanceof ChannelException) {
- Throwable cause = t.getCause();
- if (cause != null && t.getCause() instanceof BindException) {
- String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
- throw new IOException("Unable to bind TaskManager actor system to address " +
- address + " - " + cause.getMessage(), t);
- }
- }
- throw new Exception("Could not create TaskManager actor system", t);
- }
-
- // start akka rpc service based on actor system
- final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
- final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout);
-
- // start high availability service to implement getResourceManagerLeaderRetriever method only
- final HighAvailabilityServices haServices = new HighAvailabilityServices() {
- @Override
- public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
- return LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
- }
-
- @Override
public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
return null;
}
@Override
- public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
- return null;
- }
-
- @Override
- public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
return null;
}
@@ -350,552 +185,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
@Override
public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
- return null;
- }
- };
-
- // start all the TaskManager services (network stack, library cache, ...)
- // and the TaskManager actor
- try {
- LOG.info("Starting TaskManager actor");
- TaskExecutor taskExecutor = startTaskManagerComponentsAndActor(
- configuration,
- resourceID,
- akkaRpcService,
- taskManagerHostname,
- haServices,
- false);
-
- taskExecutor.start();
-
- // if desired, start the logging daemon that periodically logs the memory usage information
- if (LOG.isInfoEnabled() && configuration.getBoolean(
- ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
- ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) {
- LOG.info("Starting periodic memory usage logger");
-
- long interval = configuration.getLong(
- ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
- ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS);
-
- MemoryLogger logger = new MemoryLogger(LOG, interval, taskManagerSystem);
- logger.start();
- }
-
- // block until everything is done
- taskManagerSystem.awaitTermination();
- } catch (Throwable t) {
- LOG.error("Error while starting up taskManager", t);
- try {
- taskManagerSystem.shutdown();
- } catch (Throwable tt) {
- LOG.warn("Could not cleanly shut down actor system", tt);
- }
- throw t;
- }
- }
-
- // --------------------------------------------------------------------------
- // Starting and running the TaskManager
- // --------------------------------------------------------------------------
-
- /**
- * @param configuration The configuration for the TaskManager.
- * @param resourceID The id of the resource which the task manager will run on.
- * @param rpcService The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
- * @param taskManagerHostname The hostname/address that describes the TaskManager's data location.
- * @param haServices Optionally, a high availability service can be provided. If none is given,
- * then a HighAvailabilityServices is constructed from the configuration.
- * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack.
- * @return An ActorRef to the TaskManager actor.
- * @throws IllegalConfigurationException Thrown, if the given config contains illegal values.
- * @throws IOException Thrown, if any of the I/O components (such as buffer pools,
- * I/O manager, ...) cannot be properly started.
- * @throws Exception Thrown is some other error occurs while parsing the configuration
- * or starting the TaskManager components.
- */
- public static TaskExecutor startTaskManagerComponentsAndActor(
- Configuration configuration,
- ResourceID resourceID,
- RpcService rpcService,
- String taskManagerHostname,
- HighAvailabilityServices haServices,
- boolean localTaskManagerCommunication) throws Exception {
-
- final TaskExecutorConfiguration taskExecutorConfig = parseTaskManagerConfiguration(
- configuration, taskManagerHostname, localTaskManagerCommunication);
-
- TaskManagerComponents taskManagerComponents = createTaskManagerComponents(
- resourceID,
- InetAddress.getByName(taskManagerHostname),
- taskExecutorConfig,
- configuration);
-
- final TaskExecutor taskExecutor = new TaskExecutor(
- taskExecutorConfig,
- resourceID,
- taskManagerComponents.getTaskManagerLocation(),
- taskManagerComponents.getMemoryManager(),
- taskManagerComponents.getIOManager(),
- taskManagerComponents.getNetworkEnvironment(),
- rpcService,
- haServices);
-
- return taskExecutor;
- }
-
- /**
- * Creates and returns the task manager components.
- *
- * @param resourceID resource ID of the task manager
- * @param taskManagerAddress address of the task manager
- * @param taskExecutorConfig task manager configuration
- * @param configuration of Flink
- * @return task manager components
- * @throws Exception
- */
- private static TaskExecutor.TaskManagerComponents createTaskManagerComponents(
- ResourceID resourceID,
- InetAddress taskManagerAddress,
- TaskExecutorConfiguration taskExecutorConfig,
- Configuration configuration) throws Exception {
- MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType();
-
- // pre-start checks
- checkTempDirs(taskExecutorConfig.getTmpDirPaths());
-
- NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskExecutorConfig.getNetworkConfig();
-
- NetworkBufferPool networkBufferPool = new NetworkBufferPool(
- networkEnvironmentConfiguration.numNetworkBuffers(),
- networkEnvironmentConfiguration.networkBufferSize(),
- networkEnvironmentConfiguration.memoryType());
-
- ConnectionManager connectionManager;
-
- if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
- connectionManager = new NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig().get());
- } else {
- connectionManager = new LocalConnectionManager();
- }
-
- ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
- TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
-
- KvStateRegistry kvStateRegistry = new KvStateRegistry();
-
- KvStateServer kvStateServer;
-
- if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
- NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig().get();
-
- int numNetworkThreads = networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ?
- nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerNetworkThreads();
-
- int numQueryThreads = networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ?
- nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerQueryThreads();
-
- kvStateServer = new KvStateServer(
- taskManagerAddress,
- networkEnvironmentConfiguration.queryServerPort(),
- numNetworkThreads,
- numQueryThreads,
- kvStateRegistry,
- new DisabledKvStateRequestStats());
- } else {
- kvStateServer = null;
- }
-
- // we start the network first, to make sure it can allocate its buffers first
- final NetworkEnvironment network = new NetworkEnvironment(
- networkBufferPool,
- connectionManager,
- resultPartitionManager,
- taskEventDispatcher,
- kvStateRegistry,
- kvStateServer,
- networkEnvironmentConfiguration.ioMode(),
- networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
- networkEnvironmentConfiguration.partitinRequestMaxBackoff());
-
- network.start();
-
- TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
- resourceID,
- taskManagerAddress,
- network.getConnectionManager().getDataPort());
-
- // computing the amount of memory to use depends on how much memory is available
- // it strictly needs to happen AFTER the network stack has been initialized
-
- // check if a value has been configured
- long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
- checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
- ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
- "MemoryManager needs at least one MB of memory. " +
- "If you leave this config parameter empty, the system automatically " +
- "pick a fraction of the available memory.");
-
- final long memorySize;
- boolean preAllocateMemory = configuration.getBoolean(
- ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
- if (configuredMemory > 0) {
- if (preAllocateMemory) {
- LOG.info("Using {} MB for managed memory." , configuredMemory);
- } else {
- LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
- }
- memorySize = configuredMemory << 20; // megabytes to bytes
- } else {
- float fraction = configuration.getFloat(
- ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
- ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
- checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction,
- ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
- "MemoryManager fraction of the free memory must be between 0.0 and 1.0");
-
- if (memType == MemoryType.HEAP) {
- long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
- if (preAllocateMemory) {
- LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
- fraction , relativeMemSize >> 20);
- } else {
- LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +
- "memory will be allocated lazily." , fraction , relativeMemSize >> 20);
- }
- memorySize = relativeMemSize;
- } else if (memType == MemoryType.OFF_HEAP) {
- // The maximum heap memory has been adjusted according to the fraction
- long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
- long directMemorySize = (long) (maxMemory / (1.0 - fraction) * fraction);
- if (preAllocateMemory) {
- LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
- fraction, directMemorySize >> 20);
- } else {
- LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
- " memory will be allocated lazily.", fraction, directMemorySize >> 20);
- }
- memorySize = directMemorySize;
- } else {
- throw new RuntimeException("No supported memory type detected.");
- }
- }
-
- // now start the memory manager
- final MemoryManager memoryManager;
- try {
- memoryManager = new MemoryManager(
- memorySize,
- taskExecutorConfig.getNumberOfSlots(),
- taskExecutorConfig.getNetworkConfig().networkBufferSize(),
- memType,
- preAllocateMemory);
- } catch (OutOfMemoryError e) {
- if (memType == MemoryType.HEAP) {
- throw new Exception("OutOfMemory error (" + e.getMessage() +
- ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
- } else if (memType == MemoryType.OFF_HEAP) {
- throw new Exception("OutOfMemory error (" + e.getMessage() +
- ") while allocating the TaskManager off-heap memory (" + memorySize +
- " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
- } else {
- throw e;
- }
- }
-
- // start the I/O manager, it will create some temp directories.
- final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths());
-
- return new TaskExecutor.TaskManagerComponents(taskManagerLocation, memoryManager, ioManager, network);
- }
-
- // --------------------------------------------------------------------------
- // Parsing and checking the TaskManager Configuration
- // --------------------------------------------------------------------------
-
- /**
- * Utility method to extract TaskManager config parameters from the configuration and to
- * sanity check them.
- *
- * @param configuration The configuration.
- * @param taskManagerHostname The host name under which the TaskManager communicates.
- * @param localTaskManagerCommunication True, to skip initializing the network stack.
- * Use only in cases where only one task manager runs.
- * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
- */
- private static TaskExecutorConfiguration parseTaskManagerConfiguration(
- Configuration configuration,
- String taskManagerHostname,
- boolean localTaskManagerCommunication) throws Exception {
-
- // ------- read values from the config and check them ---------
- // (a lot of them)
-
- // ----> hosts / ports for communication and data exchange
-
- int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
- if (dataport == 0) {
- dataport = NetUtils.getAvailablePort();
- }
- checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
- "Leave config parameter empty or use 0 to let the system choose a port automatically.");
-
- InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname);
- final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
-
- // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories
-
- // we need this because many configs have been written with a "-1" entry
- int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
- if (slots == -1) {
- slots = 1;
- }
- checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
- "Number of task slots must be at least one.");
-
- final int numNetworkBuffers = configuration.getInteger(
- ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
- checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
- ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, "");
-
- final int pageSize = configuration.getInteger(
- ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
- // check page size of for minimum size
- checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
- ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
- "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
- // check page size for power of two
- checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
- ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
- "Memory segment size must be a power of 2.");
-
- // check whether we use heap or off-heap memory
- final MemoryType memType;
- if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) {
- memType = MemoryType.OFF_HEAP;
- } else {
- memType = MemoryType.HEAP;
- }
-
- // initialize the memory segment factory accordingly
- if (memType == MemoryType.HEAP) {
- if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
- throw new Exception("Memory type is set to heap memory, but memory segment " +
- "factory has been initialized for off-heap memory segments");
- }
- } else {
- if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) {
- throw new Exception("Memory type is set to off-heap memory, but memory segment " +
- "factory has been initialized for heap memory segments");
- }
- }
-
- final String[] tmpDirs = configuration.getString(
- ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
-
- final NettyConfig nettyConfig;
- if (!localTaskManagerCommunication) {
- nettyConfig = new NettyConfig(
- taskManagerInetSocketAddress.getAddress(),
- taskManagerInetSocketAddress.getPort(),
- pageSize,
- slots,
- configuration);
- } else {
- nettyConfig = null;
- }
-
- // Default spill I/O mode for intermediate results
- final String syncOrAsync = configuration.getString(
- ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
- ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
-
- final IOMode ioMode;
- if (syncOrAsync.equals("async")) {
- ioMode = IOManager.IOMode.ASYNC;
- } else {
- ioMode = IOManager.IOMode.SYNC;
- }
-
- final int queryServerPort = configuration.getInteger(
- ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
- ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT);
-
- final int queryServerNetworkThreads = configuration.getInteger(
- ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
- ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS);
-
- final int queryServerQueryThreads = configuration.getInteger(
- ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
- ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS);
-
- final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration(
- numNetworkBuffers,
- pageSize,
- memType,
- ioMode,
- queryServerPort,
- queryServerNetworkThreads,
- queryServerQueryThreads,
- Option.apply(nettyConfig),
- 500,
- 30000);
-
- // ----> timeouts, library caching, profiling
-
- final FiniteDuration timeout;
- try {
- timeout = AkkaUtils.getTimeout(configuration);
- } catch (Exception e) {
- throw new IllegalArgumentException(
- "Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT +
- "'.Use formats like '50 s' or '1 min' to specify the timeout.");
- }
- LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout);
-
- final long cleanupInterval = configuration.getLong(
- ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
- ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
-
- final FiniteDuration finiteRegistrationDuration;
- try {
- Duration maxRegistrationDuration = Duration.create(configuration.getString(
- ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
- ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
- if (maxRegistrationDuration.isFinite()) {
- finiteRegistrationDuration = new FiniteDuration(maxRegistrationDuration.toSeconds(), TimeUnit.SECONDS);
- } else {
- finiteRegistrationDuration = null;
- }
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("Invalid format for parameter " +
- ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e);
- }
-
- final FiniteDuration initialRegistrationPause;
- try {
- Duration pause = Duration.create(configuration.getString(
- ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
- ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
- if (pause.isFinite()) {
- initialRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
- } else {
- throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);
- }
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("Invalid format for parameter " +
- ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
- }
-
- final FiniteDuration maxRegistrationPause;
- try {
- Duration pause = Duration.create(configuration.getString(
- ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
- ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
- if (pause.isFinite()) {
- maxRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
- } else {
- throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);
- }
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("Invalid format for parameter " +
- ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
- }
-
- final FiniteDuration refusedRegistrationPause;
- try {
- Duration pause = Duration.create(configuration.getString(
- ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
- ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
- if (pause.isFinite()) {
- refusedRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
- } else {
- throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);
- }
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("Invalid format for parameter " +
- ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
- }
-
- return new TaskExecutorConfiguration(
- tmpDirs,
- cleanupInterval,
- networkConfig,
- timeout,
- finiteRegistrationDuration,
- slots,
- configuration,
- initialRegistrationPause,
- maxRegistrationPause,
- refusedRegistrationPause);
- }
-
- /**
- * Validates a condition for a config parameter and displays a standard exception, if the
- * the condition does not hold.
- *
- * @param condition The condition that must hold. If the condition is false, an exception is thrown.
- * @param parameter The parameter value. Will be shown in the exception message.
- * @param name The name of the config parameter. Will be shown in the exception message.
- * @param errorMessage The optional custom error message to append to the exception message.
- */
- private static void checkConfigParameter(
- boolean condition,
- Object parameter,
- String name,
- String errorMessage) {
- if (!condition) {
- throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage);
- }
- }
-
- /**
- * Validates that all the directories denoted by the strings do actually exist, are proper
- * directories (not files), and are writable.
- *
- * @param tmpDirs The array of directory paths to check.
- * @throws Exception Thrown if any of the directories does not exist or is not writable
- * or is a file, rather than a directory.
- */
- private static void checkTempDirs(String[] tmpDirs) throws IOException {
- for (String dir : tmpDirs) {
- if (dir != null && !dir.equals("")) {
- File file = new File(dir);
- if (!file.exists()) {
- throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist.");
- }
- if (!file.isDirectory()) {
- throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory.");
- }
- if (!file.canWrite()) {
- throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable.");
- }
-
- if (LOG.isInfoEnabled()) {
- long totalSpaceGb = file.getTotalSpace() >> 30;
- long usableSpaceGb = file.getUsableSpace() >> 30;
- double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100;
- String path = file.getAbsolutePath();
- LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)",
- path, totalSpaceGb, usableSpaceGb, usablePercentage));
- }
- } else {
- throw new IllegalArgumentException("Temporary file directory #$id is null.");
- }
- }
- }
-
// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
public ResourceID getResourceID() {
- return resourceID;
+ return taskManagerLocation.getResourceID();
}
// ------------------------------------------------------------------------
@@ -959,37 +254,4 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
}
- private static class TaskManagerComponents {
- private final TaskManagerLocation taskManagerLocation;
- private final MemoryManager memoryManager;
- private final IOManager ioManager;
- private final NetworkEnvironment networkEnvironment;
-
- private TaskManagerComponents(
- TaskManagerLocation taskManagerLocation,
- MemoryManager memoryManager,
- IOManager ioManager,
- NetworkEnvironment networkEnvironment) {
- this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
- this.memoryManager = Preconditions.checkNotNull(memoryManager);
- this.ioManager = Preconditions.checkNotNull(ioManager);
- this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
- }
-
- public MemoryManager getMemoryManager() {
- return memoryManager;
- }
-
- public IOManager getIOManager() {
- return ioManager;
- }
-
- public NetworkEnvironment getNetworkEnvironment() {
- return networkEnvironment;
- }
-
- public TaskManagerLocation getTaskManagerLocation() {
- return taskManagerLocation;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdc3a0a7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java
new file mode 100644
index 0000000..4f756fb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRunner.java
@@ -0,0 +1,749 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.HybridMemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.NetUtils;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import com.typesafe.config.Config;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is the executable entry point for the task manager in yarn or standalone mode.
+ * It constructs the related components (network, I/O manager, memory manager, RPC service, HA service)
+ * and starts them.
+ */
+public class TaskManagerRunner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class);
+
+ /**
+ * Constructs related components of the TaskManager and starts them.
+ *
+ * @param configuration The configuration for the TaskManager.
+ * @param resourceID The id of the resource which the task manager will run on.
+ * @param rpcService Optionally, The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
+ * If none is given, then a RpcService is constructed from the configuration.
+ * @param taskManagerHostname Optionally, The hostname/address that describes the TaskManager's data location.
+ * If none is given, it can be got from the configuration.
+ * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack.
+ * @param haServices Optionally, a high availability service can be provided. If none is given,
+ * then a HighAvailabilityServices is constructed from the configuration.
+ */
+ public static void createAndStartComponents(
+ final Configuration configuration,
+ final ResourceID resourceID,
+ RpcService rpcService,
+ String taskManagerHostname,
+ boolean localTaskManagerCommunication,
+ HighAvailabilityServices haServices) throws Exception {
+
+ checkNotNull(configuration);
+ checkNotNull(resourceID);
+
+ if (taskManagerHostname == null || taskManagerHostname.isEmpty()) {
+ taskManagerHostname = selectNetworkInterface(configuration);
+ }
+
+ if (rpcService == null) {
+ // if no task manager port has been configured, use 0 (system will pick any free port)
+ final int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+ if (actorSystemPort < 0 || actorSystemPort > 65535) {
+ throw new IllegalConfigurationException("Invalid value for '" +
+ ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
+ "' (port for the TaskManager actor system) : " + actorSystemPort +
+ " - Leave config parameter empty or use 0 to let the system choose a port automatically.");
+ }
+ rpcService = createRpcService(configuration, taskManagerHostname, actorSystemPort);
+ }
+
+ if(haServices == null) {
+ // start high availability service to implement getResourceManagerLeaderRetriever method only
+ haServices = new HighAvailabilityServices() {
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+ return LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+ }
+
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+ return null;
+ }
+
+ @Override
+ public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
+ return null;
+ }
+ };
+ }
+
+ createAndStartTaskManagerComponents(
+ configuration,
+ resourceID,
+ rpcService,
+ taskManagerHostname,
+ haServices,
+ localTaskManagerCommunication);
+ }
+
+ /**
+ * <p/>
+ * This method tries to select the network interface to use for the TaskManager
+ * communication. The network interface is used both for the actor communication
+ * (coordination) as well as for the data exchange between task managers. Unless
+ * the hostname/interface is explicitly configured in the configuration, this
+ * method will try out various interfaces and methods to connect to the JobManager
+ * and select the one where the connection attempt is successful.
+ * <p/>
+ *
+ * @param configuration The configuration for the TaskManager.
+ * @return The host name under which the TaskManager communicates.
+ */
+ private static String selectNetworkInterface(Configuration configuration) throws Exception {
+ String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+ if (taskManagerHostname != null) {
+ LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname);
+ } else {
+ LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+ FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
+
+ InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, lookupTimeout);
+ taskManagerHostname = taskManagerAddress.getHostName();
+ LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
+ taskManagerHostname, taskManagerAddress.getHostAddress());
+ }
+
+ return taskManagerHostname;
+ }
+
+ /**
+ * Utility method to create RPC service from configuration and hostname, port.
+ *
+ * @param configuration The configuration for the TaskManager.
+ * @param taskManagerHostname The hostname/address that describes the TaskManager's data location.
+ * @param actorSystemPort If true, the TaskManager will not initiate the TCP network stack.
+ * @return The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
+ * @throws java.io.IOException Thrown, if the actor system can not bind to the address
+ * @throws java.lang.Exception Thrown is some other error occurs while creating akka actor system
+ */
+ private static RpcService createRpcService(Configuration configuration, String taskManagerHostname, int actorSystemPort)
+ throws Exception{
+
+ // Bring up the TaskManager actor system first, bind it to the given address.
+
+ LOG.info("Starting TaskManager actor system at " +
+ NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort));
+
+ final ActorSystem taskManagerSystem;
+ try {
+ Tuple2<String, Object> address = new Tuple2<String, Object>(taskManagerHostname, actorSystemPort);
+ Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new Some<>(address));
+ LOG.debug("Using akka configuration\n " + akkaConfig);
+ taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig);
+ } catch (Throwable t) {
+ if (t instanceof org.jboss.netty.channel.ChannelException) {
+ Throwable cause = t.getCause();
+ if (cause != null && t.getCause() instanceof java.net.BindException) {
+ String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
+ throw new IOException("Unable to bind TaskManager actor system to address " +
+ address + " - " + cause.getMessage(), t);
+ }
+ }
+ throw new Exception("Could not create TaskManager actor system", t);
+ }
+
+ // start akka rpc service based on actor system
+ final Timeout timeout = new Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS);
+ final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout);
+
+ return akkaRpcService;
+ }
+
+ /**
+ * @param configuration The configuration for the TaskManager.
+ * @param resourceID The id of the resource which the task manager will run on.
+ * @param rpcService The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
+ * @param taskManagerHostname The hostname/address that describes the TaskManager's data location.
+ * @param haServices Optionally, a high availability service can be provided. If none is given,
+ * then a HighAvailabilityServices is constructed from the configuration.
+ * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack.
+ * @throws IllegalConfigurationException Thrown, if the given config contains illegal values.
+ * @throws IOException Thrown, if any of the I/O components (such as buffer pools, I/O manager, ...)
+ * cannot be properly started.
+ * @throws Exception Thrown is some other error occurs while parsing the configuration or
+ * starting the TaskManager components.
+ */
+ private static void createAndStartTaskManagerComponents(
+ Configuration configuration,
+ ResourceID resourceID,
+ RpcService rpcService,
+ String taskManagerHostname,
+ HighAvailabilityServices haServices,
+ boolean localTaskManagerCommunication) throws Exception {
+
+ final TaskExecutorConfiguration taskManagerConfig = parseTaskManagerConfiguration(
+ configuration, taskManagerHostname, localTaskManagerCommunication);
+
+ TaskManagerComponents taskManagerComponents = createTaskManagerComponents(
+ resourceID,
+ InetAddress.getByName(taskManagerHostname),
+ taskManagerConfig,
+ configuration);
+
+ final TaskExecutor taskExecutor = new TaskExecutor(
+ taskManagerConfig,
+ taskManagerComponents.getTaskManagerLocation(),
+ rpcService, taskManagerComponents.getMemoryManager(),
+ taskManagerComponents.getIOManager(),
+ taskManagerComponents.getNetworkEnvironment(),
+ haServices);
+
+ taskExecutor.start();
+ }
+
+ /**
+ * Creates and returns the task manager components.
+ *
+ * @param resourceID resource ID of the task manager
+ * @param taskManagerAddress address of the task manager
+ * @param taskExecutorConfig task manager configuration
+ * @param configuration of Flink
+ * @return task manager components
+ * @throws Exception
+ */
+ private static TaskManagerComponents createTaskManagerComponents(
+ ResourceID resourceID,
+ InetAddress taskManagerAddress,
+ TaskExecutorConfiguration taskExecutorConfig,
+ Configuration configuration) throws Exception {
+
+ MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType();
+
+ // pre-start checks
+ checkTempDirs(taskExecutorConfig.getTmpDirPaths());
+
+ NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskExecutorConfig.getNetworkConfig();
+
+ NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+ networkEnvironmentConfiguration.numNetworkBuffers(),
+ networkEnvironmentConfiguration.networkBufferSize(),
+ networkEnvironmentConfiguration.memoryType());
+
+ ConnectionManager connectionManager;
+
+ if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
+ connectionManager = new NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig().get());
+ } else {
+ connectionManager = new LocalConnectionManager();
+ }
+
+ ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
+ TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
+
+ KvStateRegistry kvStateRegistry = new KvStateRegistry();
+
+ KvStateServer kvStateServer;
+
+ if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
+ NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig().get();
+
+ int numNetworkThreads = networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ?
+ nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerNetworkThreads();
+
+ int numQueryThreads = networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ?
+ nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerQueryThreads();
+
+ kvStateServer = new KvStateServer(
+ taskManagerAddress,
+ networkEnvironmentConfiguration.queryServerPort(),
+ numNetworkThreads,
+ numQueryThreads,
+ kvStateRegistry,
+ new DisabledKvStateRequestStats());
+ } else {
+ kvStateServer = null;
+ }
+
+ // we start the network first, to make sure it can allocate its buffers first
+ final NetworkEnvironment network = new NetworkEnvironment(
+ networkBufferPool,
+ connectionManager,
+ resultPartitionManager,
+ taskEventDispatcher,
+ kvStateRegistry,
+ kvStateServer,
+ networkEnvironmentConfiguration.ioMode(),
+ networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
+ networkEnvironmentConfiguration.partitinRequestMaxBackoff());
+
+ network.start();
+
+ final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
+ resourceID,
+ taskManagerAddress,
+ network.getConnectionManager().getDataPort());
+
+ // computing the amount of memory to use depends on how much memory is available
+ // it strictly needs to happen AFTER the network stack has been initialized
+
+ // check if a value has been configured
+ long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+ checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
+ ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+ "MemoryManager needs at least one MB of memory. " +
+ "If you leave this config parameter empty, the system automatically " +
+ "pick a fraction of the available memory.");
+
+ final long memorySize;
+ boolean preAllocateMemory = configuration.getBoolean(
+ ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
+ if (configuredMemory > 0) {
+ if (preAllocateMemory) {
+ LOG.info("Using {} MB for managed memory." , configuredMemory);
+ } else {
+ LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
+ }
+ memorySize = configuredMemory << 20; // megabytes to bytes
+ } else {
+ float fraction = configuration.getFloat(
+ ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+ ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
+ checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction,
+ ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+ "MemoryManager fraction of the free memory must be between 0.0 and 1.0");
+
+ if (memType == MemoryType.HEAP) {
+ long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
+ if (preAllocateMemory) {
+ LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
+ fraction , relativeMemSize >> 20);
+ } else {
+ LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +
+ "memory will be allocated lazily." , fraction , relativeMemSize >> 20);
+ }
+ memorySize = relativeMemSize;
+ } else if (memType == MemoryType.OFF_HEAP) {
+ // The maximum heap memory has been adjusted according to the fraction
+ long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
+ long directMemorySize = (long) (maxMemory / (1.0 - fraction) * fraction);
+ if (preAllocateMemory) {
+ LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
+ fraction, directMemorySize >> 20);
+ } else {
+ LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
+ " memory will be allocated lazily.", fraction, directMemorySize >> 20);
+ }
+ memorySize = directMemorySize;
+ } else {
+ throw new RuntimeException("No supported memory type detected.");
+ }
+ }
+
+ // now start the memory manager
+ final MemoryManager memoryManager;
+ try {
+ memoryManager = new MemoryManager(
+ memorySize,
+ taskExecutorConfig.getNumberOfSlots(),
+ taskExecutorConfig.getNetworkConfig().networkBufferSize(),
+ memType,
+ preAllocateMemory);
+ } catch (OutOfMemoryError e) {
+ if (memType == MemoryType.HEAP) {
+ throw new Exception("OutOfMemory error (" + e.getMessage() +
+ ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
+ } else if (memType == MemoryType.OFF_HEAP) {
+ throw new Exception("OutOfMemory error (" + e.getMessage() +
+ ") while allocating the TaskManager off-heap memory (" + memorySize +
+ " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
+ } else {
+ throw e;
+ }
+ }
+
+ // start the I/O manager, it will create some temp directories.
+ final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths());
+
+ return new TaskManagerComponents(taskManagerLocation, memoryManager, ioManager, network);
+ }
+
+ // --------------------------------------------------------------------------
+ // Parsing and checking the TaskManager Configuration
+ // --------------------------------------------------------------------------
+
+ /**
+ * Utility method to extract TaskManager config parameters from the configuration and to
+ * sanity check them.
+ *
+ * @param configuration The configuration.
+ * @param taskManagerHostname The host name under which the TaskManager communicates.
+ * @param localTaskManagerCommunication True, to skip initializing the network stack.
+ * Use only in cases where only one task manager runs.
+ * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
+ */
+ private static TaskExecutorConfiguration parseTaskManagerConfiguration(
+ Configuration configuration,
+ String taskManagerHostname,
+ boolean localTaskManagerCommunication) throws Exception {
+
+ // ------- read values from the config and check them ---------
+ // (a lot of them)
+
+ // ----> hosts / ports for communication and data exchange
+
+ int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
+
+ checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+ "Leave config parameter empty or use 0 to let the system choose a port automatically.");
+
+ InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname);
+ final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
+
+ // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories
+
+ // we need this because many configs have been written with a "-1" entry
+ int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ if (slots == -1) {
+ slots = 1;
+ }
+
+ checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+ "Number of task slots must be at least one.");
+
+ final int numNetworkBuffers = configuration.getInteger(
+ ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+
+ checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
+ ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, "");
+
+ final int pageSize = configuration.getInteger(
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
+
+ // check page size of for minimum size
+ checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
+
+ // check page size for power of two
+ checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ "Memory segment size must be a power of 2.");
+
+ // check whether we use heap or off-heap memory
+ final MemoryType memType;
+ if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) {
+ memType = MemoryType.OFF_HEAP;
+ } else {
+ memType = MemoryType.HEAP;
+ }
+
+ // initialize the memory segment factory accordingly
+ if (memType == MemoryType.HEAP) {
+ if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
+ throw new Exception("Memory type is set to heap memory, but memory segment " +
+ "factory has been initialized for off-heap memory segments");
+ }
+ } else {
+ if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) {
+ throw new Exception("Memory type is set to off-heap memory, but memory segment " +
+ "factory has been initialized for heap memory segments");
+ }
+ }
+
+ final String[] tmpDirs = configuration.getString(
+ ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
+
+ final NettyConfig nettyConfig;
+ if (!localTaskManagerCommunication) {
+ nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(),
+ taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration);
+ } else {
+ nettyConfig = null;
+ }
+
+ // Default spill I/O mode for intermediate results
+ final String syncOrAsync = configuration.getString(
+ ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
+
+ final IOManager.IOMode ioMode;
+ if (syncOrAsync.equals("async")) {
+ ioMode = IOManager.IOMode.ASYNC;
+ } else {
+ ioMode = IOManager.IOMode.SYNC;
+ }
+
+ final int queryServerPort = configuration.getInteger(
+ ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
+ ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT);
+
+ final int queryServerNetworkThreads = configuration.getInteger(
+ ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
+ ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS);
+
+ final int queryServerQueryThreads = configuration.getInteger(
+ ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
+ ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS);
+
+ final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration(
+ numNetworkBuffers,
+ pageSize,
+ memType,
+ ioMode,
+ queryServerPort,
+ queryServerNetworkThreads,
+ queryServerQueryThreads,
+ Option.apply(nettyConfig),
+ 500,
+ 3000);
+
+ // ----> timeouts, library caching, profiling
+
+ final FiniteDuration timeout;
+ try {
+ timeout = AkkaUtils.getTimeout(configuration);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT +
+ "'.Use formats like '50 s' or '1 min' to specify the timeout.");
+ }
+ LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout);
+
+ final long cleanupInterval = configuration.getLong(
+ ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+ ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+
+ final FiniteDuration finiteRegistrationDuration;
+ try {
+ Duration maxRegistrationDuration = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
+ if (maxRegistrationDuration.isFinite()) {
+ finiteRegistrationDuration = new FiniteDuration(maxRegistrationDuration.toSeconds(), TimeUnit.SECONDS);
+ } else {
+ finiteRegistrationDuration = null;
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e);
+ }
+
+ final FiniteDuration initialRegistrationPause;
+ try {
+ Duration pause = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
+ if (pause.isFinite()) {
+ initialRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
+ } else {
+ throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+ }
+
+ final FiniteDuration maxRegistrationPause;
+ try {
+ Duration pause = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
+ if (pause.isFinite()) {
+ maxRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
+ } else {
+ throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+ }
+
+ final FiniteDuration refusedRegistrationPause;
+ try {
+ Duration pause = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
+ if (pause.isFinite()) {
+ refusedRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
+ } else {
+ throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+ }
+
+ return new TaskExecutorConfiguration(
+ tmpDirs,
+ cleanupInterval,
+ networkConfig,
+ timeout,
+ finiteRegistrationDuration,
+ slots,
+ configuration,
+ initialRegistrationPause,
+ maxRegistrationPause,
+ refusedRegistrationPause);
+ }
+
+ /**
+ * Validates a condition for a config parameter and displays a standard exception, if the
+ * the condition does not hold.
+ *
+ * @param condition The condition that must hold. If the condition is false, an exception is thrown.
+ * @param parameter The parameter value. Will be shown in the exception message.
+ * @param name The name of the config parameter. Will be shown in the exception message.
+ * @param errorMessage The optional custom error message to append to the exception message.
+ */
+ private static void checkConfigParameter(
+ boolean condition,
+ Object parameter,
+ String name,
+ String errorMessage) {
+ if (!condition) {
+ throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage);
+ }
+ }
+
+ /**
+ * Validates that all the directories denoted by the strings do actually exist, are proper
+ * directories (not files), and are writable.
+ *
+ * @param tmpDirs The array of directory paths to check.
+ * @throws Exception Thrown if any of the directories does not exist or is not writable
+ * or is a file, rather than a directory.
+ */
+ private static void checkTempDirs(String[] tmpDirs) throws IOException {
+ for (String dir : tmpDirs) {
+ if (dir != null && !dir.equals("")) {
+ File file = new File(dir);
+ if (!file.exists()) {
+ throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist.");
+ }
+ if (!file.isDirectory()) {
+ throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory.");
+ }
+ if (!file.canWrite()) {
+ throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable.");
+ }
+
+ if (LOG.isInfoEnabled()) {
+ long totalSpaceGb = file.getTotalSpace() >> 30;
+ long usableSpaceGb = file.getUsableSpace() >> 30;
+ double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100;
+ String path = file.getAbsolutePath();
+ LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)",
+ path, totalSpaceGb, usableSpaceGb, usablePercentage));
+ }
+ } else {
+ throw new IllegalArgumentException("Temporary file directory #$id is null.");
+ }
+ }
+ }
+
+ private static class TaskManagerComponents {
+ private final TaskManagerLocation taskManagerLocation;
+ private final MemoryManager memoryManager;
+ private final IOManager ioManager;
+ private final NetworkEnvironment networkEnvironment;
+
+ private TaskManagerComponents(
+ TaskManagerLocation taskManagerLocation,
+ MemoryManager memoryManager,
+ IOManager ioManager,
+ NetworkEnvironment networkEnvironment) {
+
+ this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
+ this.memoryManager = Preconditions.checkNotNull(memoryManager);
+ this.ioManager = Preconditions.checkNotNull(ioManager);
+ this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
+ }
+
+ public MemoryManager getMemoryManager() {
+ return memoryManager;
+ }
+
+ public IOManager getIOManager() {
+ return ioManager;
+ }
+
+ public NetworkEnvironment getNetworkEnvironment() {
+ return networkEnvironment;
+ }
+
+ public TaskManagerLocation getTaskManagerLocation() {
+ return taskManagerLocation;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdc3a0a7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 09aab18..26218dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -19,17 +19,22 @@
package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.NonHaServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
import java.util.UUID;
import static org.junit.Assert.*;
@@ -42,19 +47,31 @@ public class TaskExecutorTest extends TestLogger {
final ResourceID resourceID = ResourceID.generate();
final String resourceManagerAddress = "/resource/manager/address/one";
- final TestingRpcService rpc = new TestingRpcService();
+ final TestingSerialRpcService rpc = new TestingSerialRpcService();
try {
// register a mock resource manager gateway
ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
+ TaskExecutorConfiguration taskExecutorConfiguration = mock(TaskExecutorConfiguration.class);
+ PowerMockito.when(taskExecutorConfiguration.getNumberOfSlots()).thenReturn(1);
rpc.registerGateway(resourceManagerAddress, rmGateway);
+ TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
+ when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+
NonHaServices haServices = new NonHaServices(resourceManagerAddress);
- TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor(
- new Configuration(), resourceID, rpc, "localhost", haServices, true);
- String taskManagerAddress = taskManager.getAddress();
+
+ TaskExecutor taskManager = new TaskExecutor(
+ taskExecutorConfiguration,
+ taskManagerLocation,
+ rpc, mock(MemoryManager.class),
+ mock(IOManager.class),
+ mock(NetworkEnvironment.class),
+ haServices);
+
taskManager.start();
+ String taskManagerAddress = taskManager.getAddress();
- verify(rmGateway, timeout(5000)).registerTaskExecutor(
+ verify(rmGateway).registerTaskExecutor(
any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(Time.class));
}
finally {
@@ -71,7 +88,7 @@ public class TaskExecutorTest extends TestLogger {
final UUID leaderId1 = UUID.randomUUID();
final UUID leaderId2 = UUID.randomUUID();
- final TestingRpcService rpc = new TestingRpcService();
+ final TestingSerialRpcService rpc = new TestingSerialRpcService();
try {
// register the mock resource manager gateways
ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
@@ -84,10 +101,22 @@ public class TaskExecutorTest extends TestLogger {
TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
haServices.setResourceManagerLeaderRetriever(testLeaderService);
- TaskExecutor taskManager = TaskExecutor.startTaskManagerComponentsAndActor(
- new Configuration(), resourceID, rpc, "localhost", haServices, true);
- String taskManagerAddress = taskManager.getAddress();
+ TaskExecutorConfiguration taskExecutorConfiguration = mock(TaskExecutorConfiguration.class);
+ PowerMockito.when(taskExecutorConfiguration.getNumberOfSlots()).thenReturn(1);
+
+ TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
+ when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+
+ TaskExecutor taskManager = new TaskExecutor(
+ taskExecutorConfiguration,
+ taskManagerLocation,
+ rpc, mock(MemoryManager.class),
+ mock(IOManager.class),
+ mock(NetworkEnvironment.class),
+ haServices);
+
taskManager.start();
+ String taskManagerAddress = taskManager.getAddress();
// no connection initially, since there is no leader
assertNull(taskManager.getResourceManagerConnection());
@@ -95,7 +124,7 @@ public class TaskExecutorTest extends TestLogger {
// define a leader and see that a registration happens
testLeaderService.notifyListener(address1, leaderId1);
- verify(rmGateway1, timeout(5000)).registerTaskExecutor(
+ verify(rmGateway1).registerTaskExecutor(
eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());
@@ -105,7 +134,7 @@ public class TaskExecutorTest extends TestLogger {
// set a new leader, see that a registration happens
testLeaderService.notifyListener(address2, leaderId2);
- verify(rmGateway2, timeout(5000)).registerTaskExecutor(
+ verify(rmGateway2).registerTaskExecutor(
eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());
}