You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:53:08 UTC
[35/50] [abbrv] flink git commit: [FLINK-4529] [flip-6] Move
TaskExecutor, JobMaster and ResourceManager out of the rpc package
http://git-wip-us.apache.org/repos/asf/flink/blob/2f12ba3e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
new file mode 100644
index 0000000..4871b96
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -0,0 +1,827 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.ExecutionContexts$;
+import akka.util.Timeout;
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.HybridMemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskmanager.MemoryLogger;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.NetUtils;
+
+import scala.Tuple2;
+import scala.Option;
+import scala.Some;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * TaskExecutor implementation. The task executor is responsible for the execution of multiple
+ * {@link org.apache.flink.runtime.taskmanager.Task}.
+ */
+public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class);
+
+ /** The unique resource ID of this TaskExecutor */
+ private final ResourceID resourceID;
+
+ /** The access to the leader election and metadata storage services */
+ private final HighAvailabilityServices haServices;
+
+ /** The task manager configuration */
+ private final TaskExecutorConfiguration taskExecutorConfig;
+
+ /** The I/O manager component in the task manager */
+ private final IOManager ioManager;
+
+ /** The memory manager component in the task manager */
+ private final MemoryManager memoryManager;
+
+ /** The network component in the task manager */
+ private final NetworkEnvironment networkEnvironment;
+
+ /** The number of slots in the task manager, should be 1 for YARN */
+ private final int numberOfSlots;
+
+ // --------- resource manager --------
+
+ private TaskExecutorToResourceManagerConnection resourceManagerConnection;
+
+ // ------------------------------------------------------------------------
+
+ public TaskExecutor(
+ TaskExecutorConfiguration taskExecutorConfig,
+ ResourceID resourceID,
+ MemoryManager memoryManager,
+ IOManager ioManager,
+ NetworkEnvironment networkEnvironment,
+ int numberOfSlots,
+ RpcService rpcService,
+ HighAvailabilityServices haServices) {
+
+ super(rpcService);
+
+ this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
+ this.resourceID = checkNotNull(resourceID);
+ this.memoryManager = checkNotNull(memoryManager);
+ this.ioManager = checkNotNull(ioManager);
+ this.networkEnvironment = checkNotNull(networkEnvironment);
+ this.numberOfSlots = checkNotNull(numberOfSlots);
+ this.haServices = checkNotNull(haServices);
+ }
+
+ // ------------------------------------------------------------------------
+ // Life cycle
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void start() {
+ super.start();
+
+ // start by connecting to the ResourceManager
+ try {
+ haServices.getResourceManagerLeaderRetriever().start(new ResourceManagerLeaderListener());
+ } catch (Exception e) {
+ onFatalErrorAsync(e);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // RPC methods - ResourceManager related
+ // ------------------------------------------------------------------------
+
+ @RpcMethod
+ public void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) {
+ if (resourceManagerConnection != null) {
+ if (newLeaderAddress != null) {
+ // the resource manager switched to a new leader
+ log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
+ resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress);
+ }
+ else {
+ // address null means that the current leader is lost without a new leader being there, yet
+ log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
+ resourceManagerConnection.getResourceManagerAddress());
+ }
+
+ // drop the current connection or connection attempt
+ if (resourceManagerConnection != null) {
+ resourceManagerConnection.close();
+ resourceManagerConnection = null;
+ }
+ }
+
+ // establish a connection to the new leader
+ if (newLeaderAddress != null) {
+ log.info("Attempting to register at ResourceManager {}", newLeaderAddress);
+ resourceManagerConnection =
+ new TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, newLeaderId);
+ resourceManagerConnection.start();
+ }
+ }
+
+ /**
+ * Starts and runs the TaskManager.
+ * <p/>
+ * This method first tries to select the network interface to use for the TaskManager
+ * communication. The network interface is used both for the actor communication
+ * (coordination) as well as for the data exchange between task managers. Unless
+ * the hostname/interface is explicitly configured in the configuration, this
+ * method will try out various interfaces and methods to connect to the JobManager
+ * and select the one where the connection attempt is successful.
+ * <p/>
+ * After selecting the network interface, this method brings up an actor system
+ * for the TaskManager and its actors, starts the TaskManager's services
+ * (library cache, shuffle network stack, ...), and starts the TaskManager itself.
+ *
+ * @param configuration The configuration for the TaskManager.
+ * @param resourceID The id of the resource which the task manager will run on.
+ */
+ public static void selectNetworkInterfaceAndRunTaskManager(
+ Configuration configuration,
+ ResourceID resourceID) throws Exception {
+
+ final InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration);
+
+ runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration);
+ }
+
+ private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration)
+ throws Exception {
+ String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+ if (taskManagerHostname != null) {
+ LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname);
+ } else {
+ LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+ FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
+
+ InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, lookupTimeout);
+ taskManagerHostname = taskManagerAddress.getHostName();
+ LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
+ taskManagerHostname, taskManagerAddress.getHostAddress());
+ }
+
+ // if no task manager port has been configured, use 0 (system will pick any free port)
+ final int actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+ if (actorSystemPort < 0 || actorSystemPort > 65535) {
+ throw new IllegalConfigurationException("Invalid value for '" +
+ ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
+ "' (port for the TaskManager actor system) : " + actorSystemPort +
+ " - Leave config parameter empty or use 0 to let the system choose a port automatically.");
+ }
+
+ return new InetSocketAddress(taskManagerHostname, actorSystemPort);
+ }
+
+ /**
+ * Starts and runs the TaskManager. Brings up an actor system for the TaskManager and its
+ * actors, starts the TaskManager's services (library cache, shuffle network stack, ...),
+ * and starts the TaskManager itself.
+ * <p/>
+ * This method will also spawn a process reaper for the TaskManager (kill the process if
+ * the actor fails) and optionally start the JVM memory logging thread.
+ *
+ * @param taskManagerHostname The hostname/address of the interface where the actor system
+ * will communicate.
+ * @param resourceID The id of the resource which the task manager will run on.
+ * @param actorSystemPort The port at which the actor system will communicate.
+ * @param configuration The configuration for the TaskManager.
+ */
+ private static void runTaskManager(
+ String taskManagerHostname,
+ ResourceID resourceID,
+ int actorSystemPort,
+ final Configuration configuration) throws Exception {
+
+ LOG.info("Starting TaskManager");
+
+ // Bring up the TaskManager actor system first, bind it to the given address.
+
+ LOG.info("Starting TaskManager actor system at " +
+ NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort));
+
+ final ActorSystem taskManagerSystem;
+ try {
+ Tuple2<String, Object> address = new Tuple2<String, Object>(taskManagerHostname, actorSystemPort);
+ Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new Some<>(address));
+ LOG.debug("Using akka configuration\n " + akkaConfig);
+ taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig);
+ } catch (Throwable t) {
+ if (t instanceof org.jboss.netty.channel.ChannelException) {
+ Throwable cause = t.getCause();
+ if (cause != null && t.getCause() instanceof java.net.BindException) {
+ String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
+ throw new IOException("Unable to bind TaskManager actor system to address " +
+ address + " - " + cause.getMessage(), t);
+ }
+ }
+ throw new Exception("Could not create TaskManager actor system", t);
+ }
+
+ // start akka rpc service based on actor system
+ final Timeout timeout = new Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS);
+ final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout);
+
+ // start high availability service to implement getResourceManagerLeaderRetriever method only
+ final HighAvailabilityServices haServices = new HighAvailabilityServices() {
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+ return LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+ }
+
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+ return null;
+ }
+
+ @Override
+ public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
+ return null;
+ }
+ };
+
+ // start all the TaskManager services (network stack, library cache, ...)
+ // and the TaskManager actor
+ try {
+ LOG.info("Starting TaskManager actor");
+ TaskExecutor taskExecutor = startTaskManagerComponentsAndActor(
+ configuration,
+ resourceID,
+ akkaRpcService,
+ taskManagerHostname,
+ haServices,
+ false);
+
+ taskExecutor.start();
+
+ // if desired, start the logging daemon that periodically logs the memory usage information
+ if (LOG.isInfoEnabled() && configuration.getBoolean(
+ ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
+ ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) {
+ LOG.info("Starting periodic memory usage logger");
+
+ long interval = configuration.getLong(
+ ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
+ ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS);
+
+ MemoryLogger logger = new MemoryLogger(LOG, interval, taskManagerSystem);
+ logger.start();
+ }
+
+ // block until everything is done
+ taskManagerSystem.awaitTermination();
+ } catch (Throwable t) {
+ LOG.error("Error while starting up taskManager", t);
+ try {
+ taskManagerSystem.shutdown();
+ } catch (Throwable tt) {
+ LOG.warn("Could not cleanly shut down actor system", tt);
+ }
+ throw t;
+ }
+ }
+
+ // --------------------------------------------------------------------------
+ // Starting and running the TaskManager
+ // --------------------------------------------------------------------------
+
+ /**
+ * @param configuration The configuration for the TaskManager.
+ * @param resourceID The id of the resource which the task manager will run on.
+ * @param rpcService The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
+ * @param taskManagerHostname The hostname/address that describes the TaskManager's data location.
+ * @param haServices Optionally, a high availability service can be provided. If none is given,
+ * then a HighAvailabilityServices is constructed from the configuration.
+ * @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack.
+ * @return An ActorRef to the TaskManager actor.
+ * @throws org.apache.flink.configuration.IllegalConfigurationException Thrown, if the given config contains illegal values.
+ * @throws java.io.IOException Thrown, if any of the I/O components (such as buffer pools,
+ * I/O manager, ...) cannot be properly started.
+ * @throws java.lang.Exception Thrown is some other error occurs while parsing the configuration
+ * or starting the TaskManager components.
+ */
+ public static TaskExecutor startTaskManagerComponentsAndActor(
+ Configuration configuration,
+ ResourceID resourceID,
+ RpcService rpcService,
+ String taskManagerHostname,
+ HighAvailabilityServices haServices,
+ boolean localTaskManagerCommunication) throws Exception {
+
+ final TaskExecutorConfiguration taskExecutorConfig = parseTaskManagerConfiguration(
+ configuration, taskManagerHostname, localTaskManagerCommunication);
+
+ MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType();
+
+ // pre-start checks
+ checkTempDirs(taskExecutorConfig.getTmpDirPaths());
+
+ ExecutionContext executionContext = ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool());
+
+ // we start the network first, to make sure it can allocate its buffers first
+ final NetworkEnvironment network = new NetworkEnvironment(
+ executionContext,
+ taskExecutorConfig.getTimeout(),
+ taskExecutorConfig.getNetworkConfig(),
+ taskExecutorConfig.getConnectionInfo());
+
+ // computing the amount of memory to use depends on how much memory is available
+ // it strictly needs to happen AFTER the network stack has been initialized
+
+ // check if a value has been configured
+ long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+ checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
+ ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+ "MemoryManager needs at least one MB of memory. " +
+ "If you leave this config parameter empty, the system automatically " +
+ "pick a fraction of the available memory.");
+
+ final long memorySize;
+ boolean preAllocateMemory = configuration.getBoolean(
+ ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
+ if (configuredMemory > 0) {
+ if (preAllocateMemory) {
+ LOG.info("Using {} MB for managed memory." , configuredMemory);
+ } else {
+ LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
+ }
+ memorySize = configuredMemory << 20; // megabytes to bytes
+ } else {
+ float fraction = configuration.getFloat(
+ ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+ ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
+ checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction,
+ ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+ "MemoryManager fraction of the free memory must be between 0.0 and 1.0");
+
+ if (memType == MemoryType.HEAP) {
+ long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
+ if (preAllocateMemory) {
+ LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
+ fraction , relativeMemSize >> 20);
+ } else {
+ LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +
+ "memory will be allocated lazily." , fraction , relativeMemSize >> 20);
+ }
+ memorySize = relativeMemSize;
+ } else if (memType == MemoryType.OFF_HEAP) {
+ // The maximum heap memory has been adjusted according to the fraction
+ long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
+ long directMemorySize = (long) (maxMemory / (1.0 - fraction) * fraction);
+ if (preAllocateMemory) {
+ LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
+ fraction, directMemorySize >> 20);
+ } else {
+ LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
+ " memory will be allocated lazily.", fraction, directMemorySize >> 20);
+ }
+ memorySize = directMemorySize;
+ } else {
+ throw new RuntimeException("No supported memory type detected.");
+ }
+ }
+
+ // now start the memory manager
+ final MemoryManager memoryManager;
+ try {
+ memoryManager = new MemoryManager(
+ memorySize,
+ taskExecutorConfig.getNumberOfSlots(),
+ taskExecutorConfig.getNetworkConfig().networkBufferSize(),
+ memType,
+ preAllocateMemory);
+ } catch (OutOfMemoryError e) {
+ if (memType == MemoryType.HEAP) {
+ throw new Exception("OutOfMemory error (" + e.getMessage() +
+ ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
+ } else if (memType == MemoryType.OFF_HEAP) {
+ throw new Exception("OutOfMemory error (" + e.getMessage() +
+ ") while allocating the TaskManager off-heap memory (" + memorySize +
+ " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
+ } else {
+ throw e;
+ }
+ }
+
+ // start the I/O manager, it will create some temp directories.
+ final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths());
+
+ final TaskExecutor taskExecutor = new TaskExecutor(
+ taskExecutorConfig,
+ resourceID,
+ memoryManager,
+ ioManager,
+ network,
+ taskExecutorConfig.getNumberOfSlots(),
+ rpcService,
+ haServices);
+
+ return taskExecutor;
+ }
+
+ // --------------------------------------------------------------------------
+ // Parsing and checking the TaskManager Configuration
+ // --------------------------------------------------------------------------
+
+ /**
+ * Utility method to extract TaskManager config parameters from the configuration and to
+ * sanity check them.
+ *
+ * @param configuration The configuration.
+ * @param taskManagerHostname The host name under which the TaskManager communicates.
+ * @param localTaskManagerCommunication True, to skip initializing the network stack.
+ * Use only in cases where only one task manager runs.
+ * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
+ */
+ private static TaskExecutorConfiguration parseTaskManagerConfiguration(
+ Configuration configuration,
+ String taskManagerHostname,
+ boolean localTaskManagerCommunication) throws Exception {
+
+ // ------- read values from the config and check them ---------
+ // (a lot of them)
+
+ // ----> hosts / ports for communication and data exchange
+
+ int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
+ if (dataport == 0) {
+ dataport = NetUtils.getAvailablePort();
+ }
+ checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+ "Leave config parameter empty or use 0 to let the system choose a port automatically.");
+
+ InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname);
+ final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(taskManagerAddress, dataport);
+
+ // ----> memory / network stack (shuffles/broadcasts), task slots, temp directories
+
+ // we need this because many configs have been written with a "-1" entry
+ int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ if (slots == -1) {
+ slots = 1;
+ }
+ checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+ "Number of task slots must be at least one.");
+
+ final int numNetworkBuffers = configuration.getInteger(
+ ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+ checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
+ ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, "");
+
+ final int pageSize = configuration.getInteger(
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
+ // check page size of for minimum size
+ checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
+ // check page size for power of two
+ checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ "Memory segment size must be a power of 2.");
+
+ // check whether we use heap or off-heap memory
+ final MemoryType memType;
+ if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) {
+ memType = MemoryType.OFF_HEAP;
+ } else {
+ memType = MemoryType.HEAP;
+ }
+
+ // initialize the memory segment factory accordingly
+ if (memType == MemoryType.HEAP) {
+ if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
+ throw new Exception("Memory type is set to heap memory, but memory segment " +
+ "factory has been initialized for off-heap memory segments");
+ }
+ } else {
+ if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) {
+ throw new Exception("Memory type is set to off-heap memory, but memory segment " +
+ "factory has been initialized for heap memory segments");
+ }
+ }
+
+ final String[] tmpDirs = configuration.getString(
+ ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
+
+ final NettyConfig nettyConfig;
+ if (!localTaskManagerCommunication) {
+ nettyConfig = new NettyConfig(connectionInfo.address(), connectionInfo.dataPort(), pageSize, slots, configuration);
+ } else {
+ nettyConfig = null;
+ }
+
+ // Default spill I/O mode for intermediate results
+ final String syncOrAsync = configuration.getString(
+ ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
+
+ final IOMode ioMode;
+ if (syncOrAsync.equals("async")) {
+ ioMode = IOManager.IOMode.ASYNC;
+ } else {
+ ioMode = IOManager.IOMode.SYNC;
+ }
+
+ final int queryServerPort = configuration.getInteger(
+ ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
+ ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT);
+
+ final int queryServerNetworkThreads = configuration.getInteger(
+ ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
+ ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS);
+
+ final int queryServerQueryThreads = configuration.getInteger(
+ ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
+ ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS);
+
+ final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration(
+ numNetworkBuffers,
+ pageSize,
+ memType,
+ ioMode,
+ queryServerPort,
+ queryServerNetworkThreads,
+ queryServerQueryThreads,
+ localTaskManagerCommunication ? Option.<NettyConfig>empty() : new Some<>(nettyConfig),
+ new Tuple2<>(500, 3000));
+
+ // ----> timeouts, library caching, profiling
+
+ final FiniteDuration timeout;
+ try {
+ timeout = AkkaUtils.getTimeout(configuration);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT +
+ "'.Use formats like '50 s' or '1 min' to specify the timeout.");
+ }
+ LOG.info("Messages between TaskManager and JobManager have a max timeout of " + timeout);
+
+ final long cleanupInterval = configuration.getLong(
+ ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+ ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+
+ final FiniteDuration finiteRegistrationDuration;
+ try {
+ Duration maxRegistrationDuration = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
+ if (maxRegistrationDuration.isFinite()) {
+ finiteRegistrationDuration = new FiniteDuration(maxRegistrationDuration.toSeconds(), TimeUnit.SECONDS);
+ } else {
+ finiteRegistrationDuration = null;
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e);
+ }
+
+ final FiniteDuration initialRegistrationPause;
+ try {
+ Duration pause = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
+ if (pause.isFinite()) {
+ initialRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
+ } else {
+ throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+ }
+
+ final FiniteDuration maxRegistrationPause;
+ try {
+ Duration pause = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
+ if (pause.isFinite()) {
+ maxRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
+ } else {
+ throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+ }
+
+ final FiniteDuration refusedRegistrationPause;
+ try {
+ Duration pause = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
+ if (pause.isFinite()) {
+ refusedRegistrationPause = new FiniteDuration(pause.toSeconds(), TimeUnit.SECONDS);
+ } else {
+ throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+ }
+
+ return new TaskExecutorConfiguration(
+ tmpDirs,
+ cleanupInterval,
+ connectionInfo,
+ networkConfig,
+ timeout,
+ finiteRegistrationDuration,
+ slots,
+ configuration,
+ initialRegistrationPause,
+ maxRegistrationPause,
+ refusedRegistrationPause);
+ }
+
+ /**
+ * Validates a condition for a config parameter and displays a standard exception, if the
+ * the condition does not hold.
+ *
+ * @param condition The condition that must hold. If the condition is false, an exception is thrown.
+ * @param parameter The parameter value. Will be shown in the exception message.
+ * @param name The name of the config parameter. Will be shown in the exception message.
+ * @param errorMessage The optional custom error message to append to the exception message.
+ */
+ private static void checkConfigParameter(
+ boolean condition,
+ Object parameter,
+ String name,
+ String errorMessage) {
+ if (!condition) {
+ throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage);
+ }
+ }
+
+ /**
+ * Validates that all the directories denoted by the strings do actually exist, are proper
+ * directories (not files), and are writable.
+ *
+ * @param tmpDirs The array of directory paths to check.
+ * @throws Exception Thrown if any of the directories does not exist or is not writable
+ * or is a file, rather than a directory.
+ */
+ private static void checkTempDirs(String[] tmpDirs) throws IOException {
+ for (String dir : tmpDirs) {
+ if (dir != null && !dir.equals("")) {
+ File file = new File(dir);
+ if (!file.exists()) {
+ throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist.");
+ }
+ if (!file.isDirectory()) {
+ throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory.");
+ }
+ if (!file.canWrite()) {
+ throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable.");
+ }
+
+ if (LOG.isInfoEnabled()) {
+ long totalSpaceGb = file.getTotalSpace() >> 30;
+ long usableSpaceGb = file.getUsableSpace() >> 30;
+ double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100;
+ String path = file.getAbsolutePath();
+ LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)",
+ path, totalSpaceGb, usableSpaceGb, usablePercentage));
+ }
+ } else {
+ throw new IllegalArgumentException("Temporary file directory #$id is null.");
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ public ResourceID getResourceID() {
+ return resourceID;
+ }
+
+ // ------------------------------------------------------------------------
+ // Error Handling
+ // ------------------------------------------------------------------------
+
+ /**
+ * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed.
+ * This method should be used when asynchronous threads want to notify the
+ * TaskExecutor of a fatal error.
+ *
+ * @param t The exception describing the fatal error
+ */
+ void onFatalErrorAsync(final Throwable t) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ onFatalError(t);
+ }
+ });
+ }
+
+ /**
+ * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed.
+ * This method must only be called from within the TaskExecutor's main thread.
+ *
+ * @param t The exception describing the fatal error
+ */
+ void onFatalError(Throwable t) {
+ // to be determined, probably delegate to a fatal error handler that
+ // would either log (mini cluster) ot kill the process (yarn, mesos, ...)
+ log.error("FATAL ERROR", t);
+ }
+
+ // ------------------------------------------------------------------------
+ // Access to fields for testing
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ TaskExecutorToResourceManagerConnection getResourceManagerConnection() {
+ return resourceManagerConnection;
+ }
+
+ // ------------------------------------------------------------------------
+ // Utility classes
+ // ------------------------------------------------------------------------
+
+ /**
+ * The listener for leader changes of the resource manager
+ */
+ private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
+
+ @Override
+ public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+ getSelf().notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+ }
+
+ @Override
+ public void handleError(Exception exception) {
+ onFatalErrorAsync(exception);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2f12ba3e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
new file mode 100644
index 0000000..3707a47
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link TaskExecutor} Configuration
+ */
+public class TaskExecutorConfiguration implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String[] tmpDirPaths;
+
+ private final long cleanupInterval;
+
+ private final int numberOfSlots;
+
+ private final Configuration configuration;
+
+ private final FiniteDuration timeout;
+ private final FiniteDuration maxRegistrationDuration;
+ private final FiniteDuration initialRegistrationPause;
+ private final FiniteDuration maxRegistrationPause;
+ private final FiniteDuration refusedRegistrationPause;
+
+ private final NetworkEnvironmentConfiguration networkConfig;
+
+ private final InstanceConnectionInfo connectionInfo;
+
+ public TaskExecutorConfiguration(
+ String[] tmpDirPaths,
+ long cleanupInterval,
+ InstanceConnectionInfo connectionInfo,
+ NetworkEnvironmentConfiguration networkConfig,
+ FiniteDuration timeout,
+ FiniteDuration maxRegistrationDuration,
+ int numberOfSlots,
+ Configuration configuration) {
+
+ this (tmpDirPaths,
+ cleanupInterval,
+ connectionInfo,
+ networkConfig,
+ timeout,
+ maxRegistrationDuration,
+ numberOfSlots,
+ configuration,
+ new FiniteDuration(500, TimeUnit.MILLISECONDS),
+ new FiniteDuration(30, TimeUnit.SECONDS),
+ new FiniteDuration(10, TimeUnit.SECONDS));
+ }
+
+ public TaskExecutorConfiguration(
+ String[] tmpDirPaths,
+ long cleanupInterval,
+ InstanceConnectionInfo connectionInfo,
+ NetworkEnvironmentConfiguration networkConfig,
+ FiniteDuration timeout,
+ FiniteDuration maxRegistrationDuration,
+ int numberOfSlots,
+ Configuration configuration,
+ FiniteDuration initialRegistrationPause,
+ FiniteDuration maxRegistrationPause,
+ FiniteDuration refusedRegistrationPause) {
+
+ this.tmpDirPaths = checkNotNull(tmpDirPaths);
+ this.cleanupInterval = checkNotNull(cleanupInterval);
+ this.connectionInfo = checkNotNull(connectionInfo);
+ this.networkConfig = checkNotNull(networkConfig);
+ this.timeout = checkNotNull(timeout);
+ this.maxRegistrationDuration = maxRegistrationDuration;
+ this.numberOfSlots = checkNotNull(numberOfSlots);
+ this.configuration = checkNotNull(configuration);
+ this.initialRegistrationPause = checkNotNull(initialRegistrationPause);
+ this.maxRegistrationPause = checkNotNull(maxRegistrationPause);
+ this.refusedRegistrationPause = checkNotNull(refusedRegistrationPause);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Properties
+ // --------------------------------------------------------------------------------------------
+
+ public String[] getTmpDirPaths() {
+ return tmpDirPaths;
+ }
+
+ public long getCleanupInterval() {
+ return cleanupInterval;
+ }
+
+ public InstanceConnectionInfo getConnectionInfo() { return connectionInfo; }
+
+ public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; }
+
+ public FiniteDuration getTimeout() {
+ return timeout;
+ }
+
+ public FiniteDuration getMaxRegistrationDuration() {
+ return maxRegistrationDuration;
+ }
+
+ public int getNumberOfSlots() {
+ return numberOfSlots;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public FiniteDuration getInitialRegistrationPause() {
+ return initialRegistrationPause;
+ }
+
+ public FiniteDuration getMaxRegistrationPause() {
+ return maxRegistrationPause;
+ }
+
+ public FiniteDuration getRefusedRegistrationPause() {
+ return refusedRegistrationPause;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/2f12ba3e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
new file mode 100644
index 0000000..6c99706
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.rpc.RpcGateway;
+
+import java.util.UUID;
+
+/**
+ * {@link TaskExecutor} RPC gateway interface
+ */
+public interface TaskExecutorGateway extends RpcGateway {
+
+ // ------------------------------------------------------------------------
+ // ResourceManager handlers
+ // ------------------------------------------------------------------------
+
+ void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2f12ba3e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
new file mode 100644
index 0000000..b357f52
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+
+import java.io.Serializable;
+
+/**
+ * Base class for responses from the ResourceManager to a registration attempt by a
+ * TaskExecutor.
+ */
+public final class TaskExecutorRegistrationSuccess extends RegistrationResponse.Success implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final InstanceID registrationId;
+
+ private final long heartbeatInterval;
+
+ /**
+ * Create a new {@code TaskExecutorRegistrationSuccess} message.
+ *
+ * @param registrationId The ID that the ResourceManager assigned the registration.
+ * @param heartbeatInterval The interval in which the ResourceManager will heartbeat the TaskExecutor.
+ */
+ public TaskExecutorRegistrationSuccess(InstanceID registrationId, long heartbeatInterval) {
+ this.registrationId = registrationId;
+ this.heartbeatInterval = heartbeatInterval;
+ }
+
+ /**
+ * Gets the ID that the ResourceManager assigned the registration.
+ */
+ public InstanceID getRegistrationId() {
+ return registrationId;
+ }
+
+ /**
+ * Gets the interval in which the ResourceManager will heartbeat the TaskExecutor.
+ */
+ public long getHeartbeatInterval() {
+ return heartbeatInterval;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskExecutorRegistrationSuccess (" + registrationId + " / " + heartbeatInterval + ')';
+ }
+
+}
+
+
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/2f12ba3e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
new file mode 100644
index 0000000..25332a0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.registration.RetryingRegistration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+
+import org.slf4j.Logger;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The connection between a TaskExecutor and the ResourceManager.
+ */
+public class TaskExecutorToResourceManagerConnection {
+
+ /** the logger for all log messages of this class */
+ private final Logger log;
+
+ /** the TaskExecutor whose connection to the ResourceManager this represents */
+ private final TaskExecutor taskExecutor;
+
+ private final UUID resourceManagerLeaderId;
+
+ private final String resourceManagerAddress;
+
+ private TaskExecutorToResourceManagerConnection.ResourceManagerRegistration pendingRegistration;
+
+ private ResourceManagerGateway registeredResourceManager;
+
+ private InstanceID registrationId;
+
+ /** flag indicating that the connection is closed */
+ private volatile boolean closed;
+
+
+ public TaskExecutorToResourceManagerConnection(
+ Logger log,
+ TaskExecutor taskExecutor,
+ String resourceManagerAddress,
+ UUID resourceManagerLeaderId) {
+
+ this.log = checkNotNull(log);
+ this.taskExecutor = checkNotNull(taskExecutor);
+ this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
+ this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
+ }
+
+ // ------------------------------------------------------------------------
+ // Life cycle
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ public void start() {
+ checkState(!closed, "The connection is already closed");
+ checkState(!isRegistered() && pendingRegistration == null, "The connection is already started");
+
+ pendingRegistration = new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
+ log, taskExecutor.getRpcService(),
+ resourceManagerAddress, resourceManagerLeaderId,
+ taskExecutor.getAddress(), taskExecutor.getResourceID());
+ pendingRegistration.startRegistration();
+
+ Future<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>> future = pendingRegistration.getFuture();
+
+ future.onSuccess(new OnSuccess<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>>() {
+ @Override
+ public void onSuccess(Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess> result) {
+ registeredResourceManager = result.f0;
+ registrationId = result.f1.getRegistrationId();
+ }
+ }, taskExecutor.getMainThreadExecutionContext());
+
+ // this future should only ever fail if there is a bug, not if the registration is declined
+ future.onFailure(new OnFailure() {
+ @Override
+ public void onFailure(Throwable failure) {
+ taskExecutor.onFatalError(failure);
+ }
+ }, taskExecutor.getMainThreadExecutionContext());
+ }
+
+ public void close() {
+ closed = true;
+
+ // make sure we do not keep re-trying forever
+ if (pendingRegistration != null) {
+ pendingRegistration.cancel();
+ }
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ public UUID getResourceManagerLeaderId() {
+ return resourceManagerLeaderId;
+ }
+
+ public String getResourceManagerAddress() {
+ return resourceManagerAddress;
+ }
+
+ /**
+ * Gets the ResourceManagerGateway. This returns null until the registration is completed.
+ */
+ public ResourceManagerGateway getResourceManager() {
+ return registeredResourceManager;
+ }
+
+ /**
+ * Gets the ID under which the TaskExecutor is registered at the ResourceManager.
+ * This returns null until the registration is completed.
+ */
+ public InstanceID getRegistrationId() {
+ return registrationId;
+ }
+
+ public boolean isRegistered() {
+ return registeredResourceManager != null;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return String.format("Connection to ResourceManager %s (leaderId=%s)",
+ resourceManagerAddress, resourceManagerLeaderId);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static class ResourceManagerRegistration
+ extends RetryingRegistration<ResourceManagerGateway, TaskExecutorRegistrationSuccess> {
+
+ private final String taskExecutorAddress;
+
+ private final ResourceID resourceID;
+
+ ResourceManagerRegistration(
+ Logger log,
+ RpcService rpcService,
+ String targetAddress,
+ UUID leaderId,
+ String taskExecutorAddress,
+ ResourceID resourceID) {
+
+ super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId);
+ this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
+ this.resourceID = checkNotNull(resourceID);
+ }
+
+ @Override
+ protected Future<RegistrationResponse> invokeRegistration(
+ ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception {
+
+ FiniteDuration timeout = new FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS);
+ return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, timeout);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2f12ba3e/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
new file mode 100644
index 0000000..744308c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Option;
+
+
+/**
+ * Runs tests to ensure that a cluster is shutdown properly.
+ */
+public class ClusterShutdownITCase extends TestLogger {
+
+ private static ActorSystem system;
+
+ private static Configuration config = new Configuration();
+
+ @BeforeClass
+ public static void setup() {
+ system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+ }
+
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(system);
+ }
+
+ /**
+ * Tests a faked cluster shutdown procedure without the ResourceManager.
+ */
+ @Test
+ public void testClusterShutdownWithoutResourceManager() {
+
+ new JavaTestKit(system){{
+ new Within(duration("30 seconds")) {
+ @Override
+ protected void run() {
+
+ ActorGateway me =
+ TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
+
+ // start job manager which doesn't shutdown the actor system
+ ActorGateway jobManager =
+ TestingUtils.createJobManager(system, config, "jobmanager1");
+
+ // Tell the JobManager to inform us of shutdown actions
+ jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
+
+ // Register a TaskManager
+ ActorGateway taskManager =
+ TestingUtils.createTaskManager(system, jobManager, config, true, true);
+
+ // Tell the TaskManager to inform us of TaskManager shutdowns
+ taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
+
+
+ // No resource manager connected
+ jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
+
+ expectMsgAllOf(
+ new TestingMessages.ComponentShutdown(taskManager.actor()),
+ new TestingMessages.ComponentShutdown(jobManager.actor()),
+ StopClusterSuccessful.getInstance()
+ );
+
+ }};
+ }};
+ }
+
+ /**
+ * Tests a faked cluster shutdown procedure with the ResourceManager.
+ */
+ @Test
+ public void testClusterShutdownWithResourceManager() {
+
+ new JavaTestKit(system){{
+ new Within(duration("30 seconds")) {
+ @Override
+ protected void run() {
+
+ ActorGateway me =
+ TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
+
+ // start job manager which doesn't shutdown the actor system
+ ActorGateway jobManager =
+ TestingUtils.createJobManager(system, config, "jobmanager2");
+
+ // Tell the JobManager to inform us of shutdown actions
+ jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
+
+ // Register a TaskManager
+ ActorGateway taskManager =
+ TestingUtils.createTaskManager(system, jobManager, config, true, true);
+
+ // Tell the TaskManager to inform us of TaskManager shutdowns
+ taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
+
+ // Start resource manager and let it register
+ ActorGateway resourceManager =
+ TestingUtils.createResourceManager(system, jobManager.actor(), config);
+
+ // Tell the ResourceManager to inform us of ResourceManager shutdowns
+ resourceManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
+
+ // notify about a resource manager registration at the job manager
+ resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+
+ // Wait for resource manager
+ expectMsgEquals(Messages.getAcknowledge());
+
+
+ // Shutdown cluster with resource manager connected
+ jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
+
+ expectMsgAllOf(
+ new TestingMessages.ComponentShutdown(taskManager.actor()),
+ new TestingMessages.ComponentShutdown(jobManager.actor()),
+ new TestingMessages.ComponentShutdown(resourceManager.actor()),
+ StopClusterSuccessful.getInstance()
+ );
+
+ }};
+ }};
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2f12ba3e/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
new file mode 100644
index 0000000..1565dc3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import scala.Option;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * It cases which test the interaction of the resource manager with job manager and task managers.
+ * Runs all tests in one Actor system.
+ */
+public class ResourceManagerITCase extends TestLogger {
+
+ private static ActorSystem system;
+
+ private static Configuration config = new Configuration();
+
+ @BeforeClass
+ public static void setup() {
+ system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+ }
+
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(system);
+ }
+
+ /**
+ * Tests whether the resource manager connects and reconciles existing task managers.
+ */
+ @Test
+ public void testResourceManagerReconciliation() {
+
+ new JavaTestKit(system){{
+ new Within(duration("10 seconds")) {
+ @Override
+ protected void run() {
+
+ ActorGateway jobManager =
+ TestingUtils.createJobManager(system, config, "ReconciliationTest");
+ ActorGateway me =
+ TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
+
+ // !! no resource manager started !!
+
+ ResourceID resourceID = ResourceID.generate();
+
+ TaskManagerLocation location = mock(TaskManagerLocation.class);
+ when(location.getResourceID()).thenReturn(resourceID);
+
+ HardwareDescription resourceProfile = HardwareDescription.extractFromSystem(1_000_000);
+
+ jobManager.tell(
+ new RegistrationMessages.RegisterTaskManager(resourceID, location, resourceProfile, 1),
+ me);
+
+ expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class);
+
+ // now start the resource manager
+ ActorGateway resourceManager =
+ TestingUtils.createResourceManager(system, jobManager.actor(), config);
+
+ // register at testing job manager to receive a message once a resource manager registers
+ resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+
+ // Wait for resource manager
+ expectMsgEquals(Messages.getAcknowledge());
+
+ // check if we registered the task manager resource
+ resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), me);
+
+ TestingResourceManager.GetRegisteredResourcesReply reply =
+ expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+ assertEquals(1, reply.resources.size());
+ assertTrue(reply.resources.contains(resourceID));
+
+ }};
+ }};
+ }
+
+ /**
+ * Tests whether the resource manager gets informed upon TaskManager registration.
+ */
+ @Test
+ public void testResourceManagerTaskManagerRegistration() {
+
+ new JavaTestKit(system){{
+ new Within(duration("30 seconds")) {
+ @Override
+ protected void run() {
+
+ ActorGateway jobManager =
+ TestingUtils.createJobManager(system, config, "RegTest");
+ ActorGateway me =
+ TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
+
+ // start the resource manager
+ ActorGateway resourceManager =
+ TestingUtils.createResourceManager(system, jobManager.actor(), config);
+
+ // notify about a resource manager registration at the job manager
+ resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+
+ // Wait for resource manager
+ expectMsgEquals(Messages.getAcknowledge());
+
+ // start task manager and wait for registration
+ ActorGateway taskManager =
+ TestingUtils.createTaskManager(system, jobManager.actor(), config, true, true);
+
+ // check if we registered the task manager resource
+ resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), me);
+
+ TestingResourceManager.GetRegisteredResourcesReply reply =
+ expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+ assertEquals(1, reply.resources.size());
+
+ }};
+ }};
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2f12ba3e/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
new file mode 100644
index 0000000..ca8a07a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
+import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
+import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * General tests for the resource manager component.
+ */
+public class ResourceManagerTest {
+
+ private static ActorSystem system;
+
+ private static ActorGateway fakeJobManager;
+ private static ActorGateway resourceManager;
+
+ private static Configuration config = new Configuration();
+
+ @BeforeClass
+ public static void setup() {
+ system = AkkaUtils.createLocalActorSystem(config);
+ }
+
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(system);
+ }
+
+ /**
+ * Tests the registration and reconciliation of the ResourceManager with the JobManager
+ */
+ @Test
+ public void testJobManagerRegistrationAndReconciliation() {
+ new JavaTestKit(system){{
+ new Within(duration("10 seconds")) {
+ @Override
+ protected void run() {
+ fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
+ resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
+
+ expectMsgClass(RegisterResourceManager.class);
+
+ List<ResourceID> resourceList = new ArrayList<>();
+ resourceList.add(ResourceID.generate());
+ resourceList.add(ResourceID.generate());
+ resourceList.add(ResourceID.generate());
+
+ resourceManager.tell(
+ new RegisterResourceManagerSuccessful(fakeJobManager.actor(), resourceList),
+ fakeJobManager);
+
+ resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
+ TestingResourceManager.GetRegisteredResourcesReply reply =
+ expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+ for (ResourceID id : resourceList) {
+ if (!reply.resources.contains(id)) {
+ fail("Expected to find all resources that were provided during registration.");
+ }
+ }
+ }};
+ }};
+ }
+
+ /**
+ * Tests delayed or erroneous registration of the ResourceManager with the JobManager
+ */
+ @Test
+ public void testDelayedJobManagerRegistration() {
+ new JavaTestKit(system){{
+ new Within(duration("10 seconds")) {
+ @Override
+ protected void run() {
+
+ // set a short timeout for lookups
+ Configuration shortTimeoutConfig = config.clone();
+ shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "1 s");
+
+ fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
+ resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), shortTimeoutConfig);
+
+ // wait for registration message
+ RegisterResourceManager msg = expectMsgClass(RegisterResourceManager.class);
+ // give wrong response
+ getLastSender().tell(new JobManagerMessages.LeaderSessionMessage(null, new Object()),
+ fakeJobManager.actor());
+
+ // expect another retry and let it time out
+ expectMsgClass(RegisterResourceManager.class);
+
+ // wait for next try after timeout
+ expectMsgClass(RegisterResourceManager.class);
+
+ }};
+ }};
+ }
+
+ @Test
+ public void testTriggerReconnect() {
+ new JavaTestKit(system){{
+ new Within(duration("10 seconds")) {
+ @Override
+ protected void run() {
+
+ // set a long timeout for lookups such that the test fails in case of timeouts
+ Configuration shortTimeoutConfig = config.clone();
+ shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "99999 s");
+
+ fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
+ resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), shortTimeoutConfig);
+
+ // wait for registration message
+ RegisterResourceManager msg = expectMsgClass(RegisterResourceManager.class);
+ // all went well
+ resourceManager.tell(
+ new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()),
+ fakeJobManager);
+
+ // force a reconnect
+ resourceManager.tell(
+ new TriggerRegistrationAtJobManager(fakeJobManager.actor()),
+ fakeJobManager);
+
+ // new registration attempt should come in
+ expectMsgClass(RegisterResourceManager.class);
+
+ }};
+ }};
+ }
+
+ /**
+ * Tests the registration and accounting of resources at the ResourceManager.
+ */
+ @Test
+ public void testTaskManagerRegistration() {
+ new JavaTestKit(system){{
+ new Within(duration("10 seconds")) {
+ @Override
+ protected void run() {
+
+ fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
+ resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
+
+ // register with JM
+ expectMsgClass(RegisterResourceManager.class);
+ resourceManager.tell(
+ new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()),
+ fakeJobManager);
+
+ ResourceID resourceID = ResourceID.generate();
+
+ // Send task manager registration
+ resourceManager.tell(new NotifyResourceStarted(resourceID),
+ fakeJobManager);
+
+ expectMsgClass(Acknowledge.class);
+
+ // check for number registration of registered resources
+ resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
+ TestingResourceManager.GetRegisteredResourcesReply reply =
+ expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+ assertEquals(1, reply.resources.size());
+
+ // Send task manager registration again
+ resourceManager.tell(new NotifyResourceStarted(resourceID),
+ fakeJobManager);
+
+ expectMsgClass(Acknowledge.class);
+
+ // check for number registration of registered resources
+ resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
+ reply = expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+ assertEquals(1, reply.resources.size());
+
+ // Send invalid null resource id to throw an exception during resource registration
+ resourceManager.tell(new NotifyResourceStarted(null),
+ fakeJobManager);
+
+ expectMsgClass(Acknowledge.class);
+
+ // check for number registration of registered resources
+ resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
+ reply = expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+ assertEquals(1, reply.resources.size());
+ }};
+ }};
+ }
+
+ @Test
+ public void testResourceRemoval() {
+ new JavaTestKit(system){{
+ new Within(duration("10 seconds")) {
+ @Override
+ protected void run() {
+
+ fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
+ resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
+
+ // register with JM
+ expectMsgClass(RegisterResourceManager.class);
+ resourceManager.tell(
+ new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()),
+ fakeJobManager);
+
+ ResourceID resourceID = ResourceID.generate();
+
+ // remove unknown resource
+ resourceManager.tell(new RemoveResource(resourceID), fakeJobManager);
+
+ // Send task manager registration
+ resourceManager.tell(new NotifyResourceStarted(resourceID),
+ fakeJobManager);
+
+ expectMsgClass(Acknowledge.class);
+
+ // check for number registration of registered resources
+ resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
+ TestingResourceManager.GetRegisteredResourcesReply reply =
+ expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+ assertEquals(1, reply.resources.size());
+ assertTrue(reply.resources.contains(resourceID));
+
+ // remove resource
+ resourceManager.tell(new RemoveResource(resourceID), fakeJobManager);
+
+ // check for number registration of registered resources
+ resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
+ reply = expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+ assertEquals(0, reply.resources.size());
+
+ }};
+ }};
+ }
+
+ /**
+ * Tests notification of JobManager about a failed resource.
+ */
+ @Test
+ public void testResourceFailureNotification() {
+ new JavaTestKit(system){{
+ new Within(duration("10 seconds")) {
+ @Override
+ protected void run() {
+
+ fakeJobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
+ resourceManager = TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
+
+ // register with JM
+ expectMsgClass(RegisterResourceManager.class);
+ resourceManager.tell(
+ new RegisterResourceManagerSuccessful(fakeJobManager.actor(), Collections.<ResourceID>emptyList()),
+ fakeJobManager);
+
+ ResourceID resourceID1 = ResourceID.generate();
+ ResourceID resourceID2 = ResourceID.generate();
+
+ // Send task manager registration
+ resourceManager.tell(new NotifyResourceStarted(resourceID1),
+ fakeJobManager);
+
+ expectMsgClass(Acknowledge.class);
+
+ // Send task manager registration
+ resourceManager.tell(new NotifyResourceStarted(resourceID2),
+ fakeJobManager);
+
+ expectMsgClass(Acknowledge.class);
+
+ // check for number registration of registered resources
+ resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager);
+ TestingResourceManager.GetRegisteredResourcesReply reply =
+ expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
+
+ assertEquals(2, reply.resources.size());
+ assertTrue(reply.resources.contains(resourceID1));
+ assertTrue(reply.resources.contains(resourceID2));
+
+ // fail resources
+ resourceManager.tell(new TestingResourceManager.FailResource(resourceID1), fakeJobManager);
+ resourceManager.tell(new TestingResourceManager.FailResource(resourceID2), fakeJobManager);
+
+ ResourceRemoved answer = expectMsgClass(ResourceRemoved.class);
+ ResourceRemoved answer2 = expectMsgClass(ResourceRemoved.class);
+
+ assertEquals(resourceID1, answer.resourceId());
+ assertEquals(resourceID2, answer2.resourceId());
+
+ }};
+ }};
+ }
+}