You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:30 UTC
[06/63] [abbrv] Refactor job graph construction to incremental
attachment based
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index f074f3c..67e4700 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -31,15 +31,12 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -62,29 +59,30 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.protocols.VersionedProtocol;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.ExecutionState2;
import org.apache.flink.runtime.execution.RuntimeEnvironment;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileRequest;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileResponse;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheUpdate;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.ChannelManager;
-import org.apache.flink.runtime.io.network.InsufficientResourcesException;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkConnectionManager;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.ipc.Server;
import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.net.NetUtils;
@@ -95,10 +93,9 @@ import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
import org.apache.flink.runtime.protocols.JobManagerProtocol;
import org.apache.flink.runtime.protocols.TaskOperationProtocol;
-import org.apache.flink.runtime.types.IntegerRecord;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.runtime.util.SerializableArrayList;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.StringUtils;
/**
@@ -106,7 +103,6 @@ import org.apache.flink.util.StringUtils;
* (or in case of an execution error) it reports the execution result back to the job manager.
* Task managers are able to automatically discover the job manager and receive its configuration from it
* as long as the job manager is running on the same local network
- *
*/
public class TaskManager implements TaskOperationProtocol {
@@ -114,10 +110,6 @@ public class TaskManager implements TaskOperationProtocol {
private static final int STARTUP_FAILURE_RETURN_CODE = 1;
- private static final int CRITICAL_ERROR_RETURN_CODE = 2;
-
- private static final int IPC_HANDLER_COUNT = 1;
-
private static final int MAX_LOST_HEART_BEATS = 3;
private static final int DELAY_AFTER_LOST_CONNECTION = 10000;
@@ -127,42 +119,36 @@ public class TaskManager implements TaskOperationProtocol {
// --------------------------------------------------------------------------------------------
+ private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE);
+
+
private final InstanceConnectionInfo localInstanceConnectionInfo;
private final HardwareDescription hardwareDescription;
private final ExecutionMode executionMode;
+
private final JobManagerProtocol jobManager;
private final InputSplitProviderProtocol globalInputSplitProvider;
private final ChannelLookupProtocol lookupService;
-
- private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE);
private final AccumulatorProtocol accumulatorProtocolProxy;
+
private final Server taskManagerServer;
private final FileCache fileCache = new FileCache();
- /**
- * This map contains all the tasks whose threads are in a state other than TERMINATED. If any task
- * is stored inside this map and its thread status is TERMINATED, this indicates a virtual machine error.
- * As a result, task status will switch to FAILED and reported to the {@link org.apache.flink.runtime.jobmanager.JobManager}.
- */
- private final Map<ExecutionVertexID, Task> runningTasks = new ConcurrentHashMap<ExecutionVertexID, Task>();
+ /** All currently running tasks */
+ private final ConcurrentHashMap<ExecutionAttemptID, Task> runningTasks = new ConcurrentHashMap<ExecutionAttemptID, Task>();
- /**
- * The instance of the {@link ChannelManager} which is responsible for
- * setting up and cleaning up the byte buffered channels of the tasks.
- */
+ /** The {@link ChannelManager} sets up and cleans up the data exchange channels of the tasks. */
private final ChannelManager channelManager;
- /**
- * Instance of the task manager profile if profiling is enabled.
- */
+ /** Instance of the task manager profile if profiling is enabled. */
private final TaskManagerProfiler profiler;
private final MemoryManager memoryManager;
@@ -181,50 +167,39 @@ public class TaskManager implements TaskOperationProtocol {
private volatile boolean shutdownComplete;
- /**
- * All parameters are obtained from the
- * {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager.
- */
- public TaskManager(ExecutionMode executionMode) throws Exception {
- if (executionMode == null) {
- throw new NullPointerException("Execution mode must not be null.");
+ // --------------------------------------------------------------------------------------------
+ // Constructor & Shutdown
+ // --------------------------------------------------------------------------------------------
+
+ public TaskManager(ExecutionMode executionMode, JobManagerProtocol jobManager, InputSplitProviderProtocol splitProvider,
+ ChannelLookupProtocol channelLookup, AccumulatorProtocol accumulators,
+ InetSocketAddress jobManagerAddress, InetAddress taskManagerBindAddress)
+ throws Exception
+ {
+ if (executionMode == null || jobManager == null || splitProvider == null || channelLookup == null || accumulators == null) {
+ throw new NullPointerException();
}
- LOG.info("Execution mode: " + executionMode);
+ LOG.info("TaskManager execution mode: " + executionMode);
+
this.executionMode = executionMode;
-
- // IMPORTANT! At this point, the GlobalConfiguration must have been read!
+ this.jobManager = jobManager;
+ this.lookupService = channelLookup;
+ this.globalInputSplitProvider = splitProvider;
+ this.accumulatorProtocolProxy = accumulators;
- final InetSocketAddress jobManagerAddress;
+ // initialize the number of slots
{
- LOG.info("Reading location of job manager from configuration");
-
- final String address = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
- final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
-
- if (address == null) {
- throw new Exception("Job manager address not configured in the GlobalConfiguration.");
- }
-
- // Try to convert configured address to {@link InetAddress}
- try {
- final InetAddress tmpAddress = InetAddress.getByName(address);
- jobManagerAddress = new InetSocketAddress(tmpAddress, port);
- }
- catch (UnknownHostException e) {
- LOG.fatal("Could not resolve JobManager host name.");
- throw new Exception("Could not resolve JobManager host name: " + e.getMessage(), e);
+ int slots = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, -1);
+ if (slots == -1) {
+ slots = 1;
+ LOG.info("Number of task slots not configured. Creating one task slot.");
+ } else if (slots <= 0) {
+ throw new Exception("Illegal value for the number of task slots: " + slots);
+ } else {
+ LOG.info("Creating " + slots + " task slot(s).");
}
-
- LOG.info("Connecting to JobManager at: " + jobManagerAddress);
- }
-
- // Create RPC connection to the JobManager
- try {
- this.jobManager = RPC.getProxy(JobManagerProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
- } catch (IOException e) {
- LOG.fatal("Could not connect to the JobManager: " + e.getMessage(), e);
- throw new Exception("Failed to initialize connection to JobManager: " + e.getMessage(), e);
+ this.numberOfSlots = slots;
}
int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, -1);
@@ -236,52 +211,18 @@ public class TaskManager implements TaskOperationProtocol {
dataPort = getAvailablePort();
}
- // Determine our own public facing address and start the server
- {
- final InetAddress taskManagerAddress;
- try {
- taskManagerAddress = getTaskManagerAddress(jobManagerAddress);
- }
- catch (Exception e) {
- throw new RuntimeException("The TaskManager failed to connect to the JobManager.", e);
- }
-
- this.localInstanceConnectionInfo = new InstanceConnectionInfo(taskManagerAddress, ipcPort, dataPort);
- LOG.info("TaskManager connection information:" + this.localInstanceConnectionInfo);
-
- // Start local RPC server
- try {
- this.taskManagerServer = RPC.getServer(this, taskManagerAddress.getHostAddress(), ipcPort, IPC_HANDLER_COUNT);
- this.taskManagerServer.start();
- } catch (IOException e) {
- LOG.fatal("Failed to start TaskManager server. " + e.getMessage(), e);
- throw new Exception("Failed to start taskmanager server. " + e.getMessage(), e);
- }
- }
-
- // Try to create local stub of the global input split provider
- try {
- this.globalInputSplitProvider = RPC.getProxy(InputSplitProviderProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
- } catch (IOException e) {
- LOG.fatal(e.getMessage(), e);
- throw new Exception("Failed to initialize connection to global input split provider: " + e.getMessage(), e);
- }
-
- // Try to create local stub for the lookup service
- try {
- this.lookupService = RPC.getProxy(ChannelLookupProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
- } catch (IOException e) {
- LOG.fatal(e.getMessage(), e);
- throw new Exception("Failed to initialize channel lookup protocol. " + e.getMessage(), e);
- }
+ this.localInstanceConnectionInfo = new InstanceConnectionInfo(taskManagerBindAddress, ipcPort, dataPort);
+ LOG.info("TaskManager connection information:" + this.localInstanceConnectionInfo);
- // Try to create local stub for the accumulators
+ // Start local RPC server, give it the number of threads as we have slots
try {
- this.accumulatorProtocolProxy = RPC.getProxy(AccumulatorProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
+ this.taskManagerServer = RPC.getServer(this, taskManagerBindAddress.getHostAddress(), ipcPort, numberOfSlots);
+ this.taskManagerServer.start();
} catch (IOException e) {
- LOG.fatal("Failed to initialize accumulator protocol: " + e.getMessage(), e);
- throw new Exception("Failed to initialize accumulator protocol: " + e.getMessage(), e);
+ LOG.fatal("Failed to start TaskManager server. " + e.getMessage(), e);
+ throw new Exception("Failed to start taskmanager server. " + e.getMessage(), e);
}
+
// Load profiler if it should be used
if (GlobalConfiguration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) {
@@ -348,20 +289,6 @@ public class TaskManager implements TaskOperationProtocol {
LOG.error(StringUtils.stringifyException(ioe));
throw new Exception("Failed to instantiate ChannelManager.", ioe);
}
-
- // initialize the number of slots
- {
- int slots = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, -1);
- if (slots == -1) {
- slots = 1;
- LOG.info("Number of task slots not configured. Creating one task slot.");
- } else if (slots <= 0) {
- throw new Exception("Illegal value for the number of task slots: " + slots);
- } else {
- LOG.info("Creating " + slots + " task slot(s).");
- }
- this.numberOfSlots = slots;
- }
// initialize the memory manager
{
@@ -457,626 +384,328 @@ public class TaskManager implements TaskOperationProtocol {
}).start();
}
}
-
- private int getAvailablePort() {
- ServerSocket serverSocket = null;
- int port = 0;
- for (int i = 0; i < 50; i++){
- try {
- serverSocket = new ServerSocket(0);
- port = serverSocket.getLocalPort();
- if (port != 0) {
- serverSocket.close();
- break;
- }
- } catch (IOException e) {
- LOG.debug("Unable to allocate port " + e.getMessage(), e);
- }
- }
- if (!serverSocket.isClosed()) {
- try {
- serverSocket.close();
- } catch (IOException e) {
- LOG.debug("error closing port",e);
- }
- }
- return port;
- }
-
+
/**
- * Entry point for the program.
- *
- * @param args
- * arguments from the command line
- * @throws IOException
+ * Shuts the task manager down.
*/
- @SuppressWarnings("static-access")
- public static void main(String[] args) throws IOException {
- Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg().withDescription(
- "Specify configuration directory.").create("configDir");
- // tempDir option is used by the YARN client.
- Option tempDir = OptionBuilder.withArgName("temporary directory (overwrites configured option)")
- .hasArg().withDescription(
- "Specify temporary directory.").create(ARG_CONF_DIR);
- configDirOpt.setRequired(true);
- tempDir.setRequired(false);
- Options options = new Options();
- options.addOption(configDirOpt);
- options.addOption(tempDir);
+ public void shutdown() {
+ if (!this.shutdownStarted.compareAndSet(false, true)) {
+ return;
+ }
+
+ LOG.info("Shutting down TaskManager");
+ // first, stop the heartbeat thread and wait for it to terminate
+ this.heartbeatThread.interrupt();
+ try {
+ this.heartbeatThread.join(1000);
+ } catch (InterruptedException e) {}
- CommandLineParser parser = new GnuParser();
- CommandLine line = null;
+ // Stop RPC proxy for the task manager
+ stopProxy(this.jobManager);
+
+ // Stop RPC proxy for the global input split assigner
+ stopProxy(this.globalInputSplitProvider);
+
+ // Stop RPC proxy for the lookup service
+ stopProxy(this.lookupService);
+
+ // Stop RPC proxy for accumulator reports
+ stopProxy(this.accumulatorProtocolProxy);
+
+ // Shut down the own RPC server
try {
- line = parser.parse(options, args);
- } catch (ParseException e) {
- System.err.println("CLI Parsing failed. Reason: " + e.getMessage());
- System.exit(STARTUP_FAILURE_RETURN_CODE);
+ this.taskManagerServer.stop();
+ } catch (Throwable t) {
+ LOG.warn("TaskManager RPC server did not shut down properly.", t);
}
- String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
- String tempDirVal = line.getOptionValue(tempDir.getOpt(), null);
-
- // First, try to load global configuration
- GlobalConfiguration.loadConfiguration(configDir);
- if(tempDirVal != null // the YARN TM runner has set a value for the temp dir
- // the configuration does not contain a temp direcory
- && GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null) == null) {
- Configuration c = GlobalConfiguration.getConfiguration();
- c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tempDirVal);
- LOG.info("Setting temporary directory to "+tempDirVal);
- GlobalConfiguration.includeConfiguration(c);
+ // Stop profiling if enabled
+ if (this.profiler != null) {
+ this.profiler.shutdown();
}
-
- // print some startup environment info, like user, code revision, etc
- EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager");
-
- // Create a new task manager object
+
+ // Shut down the channel manager
try {
- new TaskManager(ExecutionMode.CLUSTER);
- } catch (Exception e) {
- LOG.fatal("Taskmanager startup failed: " + e.getMessage(), e);
- System.exit(STARTUP_FAILURE_RETURN_CODE);
+ this.channelManager.shutdown();
+ } catch (Throwable t) {
+ LOG.warn("ChannelManager did not shutdown properly: " + t.getMessage(), t);
}
-
- // park the main thread to keep the JVM alive (all other threads may be daemon threads)
- Object mon = new Object();
- synchronized (mon) {
- try {
- mon.wait();
- } catch (InterruptedException ex) {}
+
+ // Shut down the memory manager
+ if (this.ioManager != null) {
+ this.ioManager.shutdown();
+ }
+
+ if (this.memoryManager != null) {
+ this.memoryManager.shutdown();
}
- }
+ this.fileCache.shutdown();
+ // Shut down the executor service
+ if (this.executorService != null) {
+ this.executorService.shutdown();
+ try {
+ this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.debug(e);
+ }
+ }
+ this.shutdownComplete = true;
+ }
/**
- * The states of address detection mechanism.
- * There is only a state transition if the current state failed to determine the address.
+ * Checks whether the task manager has already been shut down.
+ *
+ * @return <code>true</code> if the task manager has already been shut down, <code>false</code> otherwise
*/
- private enum AddressDetectionState {
- ADDRESS(50), //detect own IP based on the JobManagers IP address. Look for common prefix
- FAST_CONNECT(50), //try to connect to the JobManager on all Interfaces and all their addresses.
- //this state uses a low timeout (say 50 ms) for fast detection.
- SLOW_CONNECT(1000); //same as FAST_CONNECT, but with a timeout of 1000 ms (1s).
-
-
- private int timeout;
- AddressDetectionState(int timeout) {
- this.timeout = timeout;
- }
- public int getTimeout() {
- return timeout;
- }
+ public boolean isShutDown() {
+ return this.shutdownComplete;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Properties
+ // --------------------------------------------------------------------------------------------
+
+ public InstanceConnectionInfo getConnectionInfo() {
+ return this.localInstanceConnectionInfo;
+ }
+
+ public ExecutionMode getExecutionMode() {
+ return this.executionMode;
+ }
+
+ /**
+ * Gets the ID under which the TaskManager is currently registered at its JobManager.
+ * If the TaskManager has not been registered, yet, or if it lost contact, this is is null.
+ *
+ * @return The ID under which the TaskManager is currently registered.
+ */
+ public InstanceID getRegisteredId() {
+ return this.registeredId;
}
/**
- * Find out the TaskManager's own IP address.
+ * Checks if the TaskManager is properly registered and ready to receive work.
+ *
+ * @return True, if the TaskManager is registered, false otherwise.
*/
- private InetAddress getTaskManagerAddress(InetSocketAddress jobManagerAddress) throws IOException {
- AddressDetectionState strategy = AddressDetectionState.ADDRESS;
-
- while (true) {
- Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
- while (e.hasMoreElements()) {
- NetworkInterface n = e.nextElement();
- Enumeration<InetAddress> ee = n.getInetAddresses();
- while (ee.hasMoreElements()) {
- InetAddress i = ee.nextElement();
- switch (strategy) {
- case ADDRESS:
- if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
- if (tryToConnect(i, jobManagerAddress, strategy.getTimeout())) {
- LOG.info("Determined " + i + " as the TaskTracker's own IP address");
- return i;
- }
- }
- break;
- case FAST_CONNECT:
- case SLOW_CONNECT:
- boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout());
- if (correct) {
- LOG.info("Determined " + i + " as the TaskTracker's own IP address");
- return i;
- }
- break;
- default:
- throw new RuntimeException("Unkown address detection strategy: " + strategy);
- }
- }
- }
- // state control
- switch (strategy) {
- case ADDRESS:
- strategy = AddressDetectionState.FAST_CONNECT;
- break;
- case FAST_CONNECT:
- strategy = AddressDetectionState.SLOW_CONNECT;
- break;
- case SLOW_CONNECT:
- throw new RuntimeException("The TaskManager is unable to connect to the JobManager (Address: '"+jobManagerAddress+"').");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Defaulting to detection strategy " + strategy);
- }
- }
- }
-
- /**
- * Checks if two addresses have a common prefix (first 2 bytes).
- * Example: 192.168.???.???
- * Works also with ipv6, but accepts probably too many addresses
- */
- private static boolean hasCommonPrefix(byte[] address, byte[] address2) {
- return address[0] == address2[0] && address[1] == address2[1];
- }
-
- public static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket, int timeout) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to connect to JobManager (" + toSocket + ") from local address " + fromAddress
- + " with timeout " + timeout);
- }
- boolean connectable = true;
- Socket socket = null;
- try {
- socket = new Socket();
- SocketAddress bindP = new InetSocketAddress(fromAddress, 0); // 0 = let the OS choose the port on this
- // machine
- socket.bind(bindP);
- socket.connect(toSocket, timeout);
- } catch (Exception ex) {
- LOG.info("Failed to connect to JobManager from address '" + fromAddress + "': " + ex.getMessage());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed with exception", ex);
- }
- connectable = false;
- } finally {
- if (socket != null) {
- socket.close();
- }
- }
- return connectable;
- }
-
+ public boolean isRegistered() {
+ return this.registeredId != null;
+ }
+
+ public Map<ExecutionAttemptID, Task> getAllRunningTasks() {
+ return Collections.unmodifiableMap(this.runningTasks);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Task Operation
+ // --------------------------------------------------------------------------------------------
@Override
- public TaskCancelResult cancelTask(final ExecutionVertexID id) throws IOException {
+ public TaskOperationResult cancelTask(JobVertexID vertexId, int subtaskIndex, ExecutionAttemptID executionId) throws IOException {
- final Task task = this.runningTasks.get(id);
+ final Task task = this.runningTasks.get(executionId);
if (task == null) {
- final TaskCancelResult taskCancelResult = new TaskCancelResult(id,
- AbstractTaskResult.ReturnCode.TASK_NOT_FOUND);
- taskCancelResult.setDescription("No task with ID " + id + " is currently running");
- return taskCancelResult;
+ return new TaskOperationResult(vertexId, subtaskIndex, executionId, false, "No task with that execution ID was found.");
}
// Pass call to executor service so IPC thread can return immediately
final Runnable r = new Runnable() {
-
@Override
public void run() {
-
- // Finally, request user code to cancel
task.cancelExecution();
}
};
-
this.executorService.execute(r);
- return new TaskCancelResult(id, AbstractTaskResult.ReturnCode.SUCCESS);
- }
-
- @Override
- public TaskKillResult killTask(final ExecutionVertexID id) throws IOException {
-
- final Task task = this.runningTasks.get(id);
-
- if (task == null) {
- final TaskKillResult taskKillResult = new TaskKillResult(id,
- AbstractTaskResult.ReturnCode.TASK_NOT_FOUND);
- taskKillResult.setDescription("No task with ID + " + id + " is currently running");
- return taskKillResult;
- }
-
- // Pass call to executor service so IPC thread can return immediately
- final Runnable r = new Runnable() {
-
- @Override
- public void run() {
-
- // Finally, request user code to cancel
- task.killExecution();
- }
- };
-
- this.executorService.execute(r);
-
- return new TaskKillResult(id, AbstractTaskResult.ReturnCode.SUCCESS);
+ // return success
+ return new TaskOperationResult(vertexId, subtaskIndex, executionId, true);
}
@Override
- public List<TaskSubmissionResult> submitTasks(final List<TaskDeploymentDescriptor> tasks) throws IOException {
-
- final List<TaskSubmissionResult> submissionResultList = new SerializableArrayList<TaskSubmissionResult>();
- final List<Task> tasksToStart = new ArrayList<Task>();
-
- // Make sure all tasks are fully registered before they are started
- for (final TaskDeploymentDescriptor tdd : tasks) {
-
- final JobID jobID = tdd.getJobID();
- final ExecutionVertexID vertexID = tdd.getVertexID();
- RuntimeEnvironment re;
-
- // retrieve the registered cache files from job configuration and create the local tmp file.
- Map<String, FutureTask<Path>> cpTasks = new HashMap<String, FutureTask<Path>>();
- for (Entry<String, DistributedCacheEntry> e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) {
- FutureTask<Path> cp = this.fileCache.createTmpFile(e.getKey(), e.getValue(), jobID);
- cpTasks.put(e.getKey(), cp);
+ public TaskOperationResult submitTask(TaskDeploymentDescriptor tdd) {
+ final JobID jobID = tdd.getJobID();
+ final JobVertexID vertexId = tdd.getVertexID();
+ final ExecutionAttemptID executionId = tdd.getExecutionId();
+ final int taskIndex = tdd.getIndexInSubtaskGroup();
+ final int numSubtasks = tdd.getCurrentNumberOfSubtasks();
+
+ try {
+ final ClassLoader userCodeClassLoader = LibraryCacheManager.getClassLoader(jobID);
+ if (userCodeClassLoader == null) {
+ throw new Exception("No user code ClassLoader available.");
}
-
- try {
- re = new RuntimeEnvironment(tdd, this.memoryManager, this.ioManager, new TaskInputSplitProvider(jobID,
- vertexID, this.globalInputSplitProvider), this.accumulatorProtocolProxy, cpTasks);
- } catch (Throwable t) {
- final TaskSubmissionResult result = new TaskSubmissionResult(vertexID,
- AbstractTaskResult.ReturnCode.DEPLOYMENT_ERROR);
- result.setDescription(StringUtils.stringifyException(t));
- LOG.error(result.getDescription(), t);
- submissionResultList.add(result);
- continue;
+
+ final Task task = new Task(jobID, vertexId, taskIndex, numSubtasks, executionId, tdd.getTaskName(), this);
+ if (this.runningTasks.putIfAbsent(executionId, task) != null) {
+ throw new Exception("TaskManager contains already a task with executionId " + executionId);
}
-
- final Configuration jobConfiguration = tdd.getJobConfiguration();
-
- // Register the task
- Task task;
+
+ // another try/finally-success block to ensure that the tasks are removed properly in case of an exception
+ boolean success = false;
try {
- task = createAndRegisterTask(vertexID, jobConfiguration, re);
- } catch (InsufficientResourcesException e) {
- final TaskSubmissionResult result = new TaskSubmissionResult(vertexID,
- AbstractTaskResult.ReturnCode.INSUFFICIENT_RESOURCES);
- result.setDescription(e.getMessage());
- LOG.error(result.getDescription(), e);
- submissionResultList.add(result);
- continue;
- }
-
- if (task == null) {
- final TaskSubmissionResult result = new TaskSubmissionResult(vertexID,
- AbstractTaskResult.ReturnCode.TASK_NOT_FOUND);
- result.setDescription("Task " + re.getTaskNameWithIndex() + " (" + vertexID + ") was already running");
- LOG.error(result.getDescription());
- submissionResultList.add(result);
- continue;
- }
-
- submissionResultList.add(new TaskSubmissionResult(vertexID, AbstractTaskResult.ReturnCode.SUCCESS));
- tasksToStart.add(task);
- }
-
- // Now start the tasks
- for (final Task task : tasksToStart) {
- task.startExecution();
- }
-
- return submissionResultList;
- }
-
- /**
- * Registers an newly incoming runtime task with the task manager.
- *
- * @param id
- * the ID of the task to register
- * @param jobConfiguration
- * the job configuration that has been attached to the original job graph
- * @param environment
- * the environment of the task to be registered
- * @return the task to be started or <code>null</code> if a task with the same ID was already running
- */
- private Task createAndRegisterTask(final ExecutionVertexID id, final Configuration jobConfiguration,
- final RuntimeEnvironment environment)
- throws InsufficientResourcesException, IOException {
-
- if (id == null) {
- throw new IllegalArgumentException("Argument id is null");
- }
-
- if (environment == null) {
- throw new IllegalArgumentException("Argument environment is null");
- }
-
- // Task creation and registration must be atomic
- Task task;
-
- synchronized (this) {
- final Task runningTask = this.runningTasks.get(id);
- boolean registerTask = true;
- if (runningTask == null) {
- task = new Task(id, environment, this);
- } else {
-
- if (runningTask instanceof Task) {
- // Task is already running
- return null;
- } else {
- // There is already a replay task running, we will simply restart it
- task = runningTask;
- registerTask = false;
- }
-
- }
-
- if (registerTask) {
- // Register the task with the byte buffered channel manager
+ final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider, jobID, vertexId);
+ final RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy);
+ task.setEnvironment(env);
+
+ // register the task with the network stack and profilers
this.channelManager.register(task);
-
- boolean enableProfiling = false;
- if (this.profiler != null && jobConfiguration.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
- enableProfiling = true;
- }
-
+
+ final Configuration jobConfig = tdd.getJobConfiguration();
+
+ boolean enableProfiling = this.profiler != null && jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true);
+
// Register environment, input, and output gates for profiling
if (enableProfiling) {
- task.registerProfiler(this.profiler, jobConfiguration);
+ task.registerProfiler(this.profiler, jobConfig);
}
-
- this.runningTasks.put(id, task);
- }
- }
- return task;
- }
-
- /**
- * Unregisters a finished or aborted task.
- *
- * @param id
- * the ID of the task to be unregistered
- */
- private void unregisterTask(final ExecutionVertexID id) {
-
- // Task de-registration must be atomic
- synchronized (this) {
-
- final Task task = this.runningTasks.remove(id);
- if (task == null) {
- LOG.error("Cannot find task with ID " + id + " to unregister");
- return;
- }
-
- // remove the local tmp file for unregistered tasks.
- for (Entry<String, DistributedCacheEntry> e: DistributedCache.readFileInfoFromConfig(task.getEnvironment().getJobConfiguration())) {
- this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), task.getJobID());
- }
- // Unregister task from the byte buffered channel manager
- this.channelManager.unregister(id, task);
-
- // Unregister task from profiling
- task.unregisterProfiler(this.profiler);
-
- // Unregister task from memory manager
- task.unregisterMemoryManager(this.memoryManager);
-
- // Unregister task from library cache manager
- try {
- LibraryCacheManager.unregister(task.getJobID());
- } catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unregistering the job vertex ID " + id + " caused an IOException");
+
+ // now that the task is successfully created and registered, we can start copying the
+ // distributed cache temp files
+ Map<String, FutureTask<Path>> cpTasks = new HashMap<String, FutureTask<Path>>();
+ for (Entry<String, DistributedCacheEntry> e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) {
+ FutureTask<Path> cp = this.fileCache.createTmpFile(e.getKey(), e.getValue(), jobID);
+ cpTasks.put(e.getKey(), cp);
+ }
+ env.addCopyTasksForCacheFile(cpTasks);
+
+ if (!task.startExecution()) {
+ throw new Exception("Cannot start task. Task was canceled or failed.");
}
+
+ success = true;
+ return new TaskOperationResult(vertexId, taskIndex, executionId, true);
}
- }
- }
-
-
- @Override
- public LibraryCacheProfileResponse getLibraryCacheProfile(LibraryCacheProfileRequest request) throws IOException {
-
- LibraryCacheProfileResponse response = new LibraryCacheProfileResponse(request);
- String[] requiredLibraries = request.getRequiredLibraries();
-
- for (int i = 0; i < requiredLibraries.length; i++) {
- if (LibraryCacheManager.contains(requiredLibraries[i]) == null) {
- response.setCached(i, false);
- } else {
- response.setCached(i, true);
+ finally {
+ if (!success) {
+ // remove task
+ this.runningTasks.remove(executionId);
+ // delete distributed cache files
+ for (Entry<String, DistributedCacheEntry> e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) {
+ this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), jobID);
+ }
+ }
}
}
-
- return response;
- }
-
-
- @Override
- public void updateLibraryCache(LibraryCacheUpdate update) throws IOException {
- // Nothing to to here
- }
-
- public void executionStateChanged(final JobID jobID, final ExecutionVertexID id,
- final ExecutionState newExecutionState, final String optionalDescription) {
-
- // Don't propagate state CANCELING back to the job manager
- if (newExecutionState == ExecutionState.CANCELING) {
- return;
- }
-
- if (newExecutionState == ExecutionState.FINISHED || newExecutionState == ExecutionState.CANCELED
- || newExecutionState == ExecutionState.FAILED) {
-
- // Unregister the task (free all buffers, remove all channels, task-specific class loaders, etc...)
- unregisterTask(id);
- }
- // Get lock on the jobManager object and propagate the state change
- synchronized (this.jobManager) {
+ catch (Throwable t) {
+ LOG.error("Could not instantiate task", t);
+
try {
- this.jobManager.updateTaskExecutionState(new TaskExecutionState(jobID, id, newExecutionState,
- optionalDescription));
+ LibraryCacheManager.unregister(jobID);
} catch (IOException e) {
- LOG.error(e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unregistering the execution " + executionId + " caused an IOException");
+ }
}
+
+ return new TaskOperationResult(vertexId, taskIndex, executionId, false, ExceptionUtils.stringifyException(t));
}
}
/**
- * Shuts the task manager down.
+ * Unregisters a finished or aborted task.
+ *
+ * @param executionId
+ * the ID of the task to be unregistered
*/
- public void shutdown() {
+ private void unregisterTask(ExecutionAttemptID executionId) {
- if (!this.shutdownStarted.compareAndSet(false, true)) {
+ // Task de-registration must be atomic
+ final Task task = this.runningTasks.remove(executionId);
+ if (task == null) {
+ LOG.error("Cannot find task with ID " + executionId + " to unregister");
return;
}
- LOG.info("Shutting down TaskManager");
+ // remove the local tmp file for unregistered tasks.
+ for (Entry<String, DistributedCacheEntry> e: DistributedCache.readFileInfoFromConfig(task.getEnvironment().getJobConfiguration())) {
+ this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), task.getJobID());
+ }
- // first, stop the heartbeat thread and wait for it to terminate
- this.heartbeatThread.interrupt();
- try {
- this.heartbeatThread.join(1000);
- } catch (InterruptedException e) {}
-
- // Stop RPC proxy for the task manager
- RPC.stopProxy(this.jobManager);
-
- // Stop RPC proxy for the global input split assigner
- RPC.stopProxy(this.globalInputSplitProvider);
+ // Unregister task from the byte buffered channel manager
+ this.channelManager.unregister(executionId, task);
- // Stop RPC proxy for the lookup service
- RPC.stopProxy(this.lookupService);
+ // Unregister task from profiling
+ task.unregisterProfiler(this.profiler);
- // Stop RPC proxy for accumulator reports
- RPC.stopProxy(this.accumulatorProtocolProxy);
-
- // Shut down the own RPC server
- this.taskManagerServer.stop();
+ // Unregister task from memory manager
+ task.unregisterMemoryManager(this.memoryManager);
- // Stop profiling if enabled
- if (this.profiler != null) {
- this.profiler.shutdown();
- }
-
- // Shut down the channel manager
+ // Unregister task from library cache manager
try {
- this.channelManager.shutdown();
+ LibraryCacheManager.unregister(task.getJobID());
} catch (IOException e) {
- LOG.warn("ChannelManager did not shutdown properly: " + e.getMessage(), e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unregistering the execution " + executionId + " caused an IOException");
+ }
}
+ }
- // Shut down the memory manager
- if (this.ioManager != null) {
- this.ioManager.shutdown();
+ public void notifyExecutionStateChange(JobID jobID, ExecutionAttemptID executionId, ExecutionState2 newExecutionState, String optionalDescription) {
+
+ // Get lock on the jobManager object and propagate the state change
+ boolean success = false;
+ try {
+ this.jobManager.updateTaskExecutionState(new TaskExecutionState(jobID, executionId, newExecutionState, optionalDescription));
}
-
- if (this.memoryManager != null) {
- this.memoryManager.shutdown();
+ catch (Throwable t) {
+ String msg = "Error sending task state update to JobManager.";
+ LOG.error(msg, t);
+ ExceptionUtils.rethrow(t, msg);
}
-
- this.fileCache.shutdown();
-
- // Shut down the executor service
- if (this.executorService != null) {
- this.executorService.shutdown();
- try {
- this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(e);
- }
+ finally {
+ // in case of a failure, or when the tasks is in a finished state, then unregister the
+ // task (free all buffers, remove all channels, task-specific class loaders, etc...)
+ if (!success || newExecutionState == ExecutionState2.FINISHED || newExecutionState == ExecutionState2.CANCELED
+ || newExecutionState == ExecutionState2.FAILED)
+ {
+
+ unregisterTask(executionId);
}
}
-
- this.shutdownComplete = true;
}
/**
- * Checks whether the task manager has already been shut down.
- *
- * @return <code>true</code> if the task manager has already been shut down, <code>false</code> otherwise
+ * Removes all tasks from this TaskManager.
*/
- public boolean isShutDown() {
- return this.shutdownComplete;
- }
-
- @Override
- public void logBufferUtilization() {
- this.channelManager.logBufferUtilization();
- }
-
- @Override
- public void killTaskManager() throws IOException {
- // Kill the entire JVM after a delay of 10ms, so this RPC will finish properly before
- final Timer timer = new Timer();
- final TimerTask timerTask = new TimerTask() {
-
- @Override
- public void run() {
- System.exit(0);
- }
- };
-
- timer.schedule(timerTask, 10L);
- }
-
-
- @Override
- public void invalidateLookupCacheEntries(final Set<ChannelID> channelIDs) throws IOException {
- this.channelManager.invalidateLookupCacheEntries(channelIDs);
- }
-
public void cancelAndClearEverything() {
LOG.info("Cancelling all computations and discarding all cached data.");
+ for (Task t : runningTasks.values()) {
+ t.cancelExecution();
+ runningTasks.remove(t.getExecutionId());
+ }
}
// --------------------------------------------------------------------------------------------
- // Properties
+ // Library caching
// --------------------------------------------------------------------------------------------
- public InstanceConnectionInfo getConnectionInfo() {
- return this.localInstanceConnectionInfo;
- }
-
- public ExecutionMode getExecutionMode() {
- return this.executionMode;
- }
-
- /**
- * Gets the ID under which the TaskManager is currently registered at its JobManager.
- * If the TaskManager has not been registered, yet, or if it lost contact, this is is null.
- *
- * @return The ID under which the TaskManager is currently registered.
- */
- public InstanceID getRegisteredId() {
- return this.registeredId;
+ @Override
+ public LibraryCacheProfileResponse getLibraryCacheProfile(LibraryCacheProfileRequest request) throws IOException {
+
+ LibraryCacheProfileResponse response = new LibraryCacheProfileResponse(request);
+ String[] requiredLibraries = request.getRequiredLibraries();
+
+ for (int i = 0; i < requiredLibraries.length; i++) {
+ if (LibraryCacheManager.contains(requiredLibraries[i]) == null) {
+ response.setCached(i, false);
+ } else {
+ response.setCached(i, true);
+ }
+ }
+
+ return response;
}
- /**
- * Checks if the TaskManager is properly registered and ready to receive work.
- *
- * @return True, if the TaskManager is registered, false otherwise.
- */
- public boolean isRegistered() {
- return this.registeredId != null;
+ @Override
+ public void updateLibraryCache(LibraryCacheUpdate update) throws IOException {
+ // Nothing to to here, because the libraries are added to the cache when the
+ // update is deserialized (WE SHOULD CHANGE THAT!!!)
}
// --------------------------------------------------------------------------------------------
@@ -1243,7 +872,187 @@ public class TaskManager implements TaskOperationProtocol {
return str.toString();
}
+
+
+ // --------------------------------------------------------------------------------------------
+ // Execution & Initialization
+ // --------------------------------------------------------------------------------------------
+
+ public static TaskManager createTaskManager(ExecutionMode mode) throws Exception {
+
+ // IMPORTANT! At this point, the GlobalConfiguration must have been read!
+
+ final InetSocketAddress jobManagerAddress;
+ LOG.info("Reading location of job manager from configuration");
+
+ final String address = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+ final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+
+ if (address == null) {
+ throw new Exception("Job manager address not configured in the GlobalConfiguration.");
+ }
+
+ // Try to convert configured address to {@link InetAddress}
+ try {
+ final InetAddress tmpAddress = InetAddress.getByName(address);
+ jobManagerAddress = new InetSocketAddress(tmpAddress, port);
+ }
+ catch (UnknownHostException e) {
+ LOG.fatal("Could not resolve JobManager host name.");
+ throw new Exception("Could not resolve JobManager host name: " + e.getMessage(), e);
+ }
+
+ return createTaskManager(mode, jobManagerAddress);
+ }
+
+ public static TaskManager createTaskManager(ExecutionMode mode, InetSocketAddress jobManagerAddress) throws Exception {
+ // Determine our own public facing address and start the server
+ final InetAddress taskManagerAddress;
+ try {
+ taskManagerAddress = getTaskManagerAddress(jobManagerAddress);
+ }
+ catch (IOException e) {
+ throw new Exception("The TaskManager failed to determine the IP address of the interface that connects to the JobManager.", e);
+ }
+
+ return createTaskManager(mode, jobManagerAddress, taskManagerAddress);
+ }
+
+
+ public static TaskManager createTaskManager(ExecutionMode mode, InetSocketAddress jobManagerAddress, InetAddress taskManagerAddress) throws Exception {
+
+ // IMPORTANT! At this point, the GlobalConfiguration must have been read!
+
+ LOG.info("Connecting to JobManager at: " + jobManagerAddress);
+
+ // Create RPC connections to the JobManager
+
+ JobManagerProtocol jobManager = null;
+ InputSplitProviderProtocol splitProvider = null;
+ ChannelLookupProtocol channelLookup = null;
+ AccumulatorProtocol accumulators = null;
+
+ // try/finally block to close proxies if anything goes wrong
+ boolean success = false;
+ try {
+ // create the RPC call proxy to the job manager for jobs
+ try {
+ jobManager = RPC.getProxy(JobManagerProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
+ }
+ catch (IOException e) {
+ LOG.fatal("Could not connect to the JobManager: " + e.getMessage(), e);
+ throw new Exception("Failed to initialize connection to JobManager: " + e.getMessage(), e);
+ }
+
+ // Try to create local stub of the global input split provider
+ try {
+ splitProvider = RPC.getProxy(InputSplitProviderProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
+ }
+ catch (IOException e) {
+ LOG.fatal(e.getMessage(), e);
+ throw new Exception("Failed to initialize connection to global input split provider: " + e.getMessage(), e);
+ }
+
+ // Try to create local stub for the lookup service
+ try {
+ channelLookup = RPC.getProxy(ChannelLookupProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
+ }
+ catch (IOException e) {
+ LOG.fatal(e.getMessage(), e);
+ throw new Exception("Failed to initialize channel lookup protocol. " + e.getMessage(), e);
+ }
+
+ // Try to create local stub for the accumulators
+ try {
+ accumulators = RPC.getProxy(AccumulatorProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
+ }
+ catch (IOException e) {
+ LOG.fatal("Failed to initialize accumulator protocol: " + e.getMessage(), e);
+ throw new Exception("Failed to initialize accumulator protocol: " + e.getMessage(), e);
+ }
+
+ TaskManager tm = new TaskManager(mode, jobManager, splitProvider, channelLookup, accumulators, jobManagerAddress, taskManagerAddress);
+ success = true;
+ return tm;
+ }
+ finally {
+ if (!success) {
+ stopProxy(jobManager);
+ stopProxy(splitProvider);
+ stopProxy(channelLookup);
+ stopProxy(accumulators);
+ }
+ }
+ }
+
+
+ // --------------------------------------------------------------------------------------------
+ // Executable
+ // --------------------------------------------------------------------------------------------
+ /**
+ * Entry point for the TaskManager executable.
+ *
+ * @param args Arguments from the command line
+ * @throws IOException
+ */
+ @SuppressWarnings("static-access")
+ public static void main(String[] args) throws IOException {
+ Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg().withDescription(
+ "Specify configuration directory.").create("configDir");
+ // tempDir option is used by the YARN client.
+ Option tempDir = OptionBuilder.withArgName("temporary directory (overwrites configured option)")
+ .hasArg().withDescription(
+ "Specify temporary directory.").create(ARG_CONF_DIR);
+ configDirOpt.setRequired(true);
+ tempDir.setRequired(false);
+ Options options = new Options();
+ options.addOption(configDirOpt);
+ options.addOption(tempDir);
+
+
+ CommandLineParser parser = new GnuParser();
+ CommandLine line = null;
+ try {
+ line = parser.parse(options, args);
+ } catch (ParseException e) {
+ System.err.println("CLI Parsing failed. Reason: " + e.getMessage());
+ System.exit(STARTUP_FAILURE_RETURN_CODE);
+ }
+
+ String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
+ String tempDirVal = line.getOptionValue(tempDir.getOpt(), null);
+
+ // First, try to load global configuration
+ GlobalConfiguration.loadConfiguration(configDir);
+ if(tempDirVal != null // the YARN TM runner has set a value for the temp dir
+ // the configuration does not contain a temp directory
+ && GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null) == null) {
+ Configuration c = GlobalConfiguration.getConfiguration();
+ c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tempDirVal);
+ LOG.info("Setting temporary directory to "+tempDirVal);
+ GlobalConfiguration.includeConfiguration(c);
+ }
+
+ // print some startup environment info, like user, code revision, etc
+ EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager");
+
+ // Create a new task manager object
+ try {
+ createTaskManager(ExecutionMode.CLUSTER);
+ } catch (Exception e) {
+ LOG.fatal("Taskmanager startup failed: " + e.getMessage(), e);
+ System.exit(STARTUP_FAILURE_RETURN_CODE);
+ }
+
+ // park the main thread to keep the JVM alive (all other threads may be daemon threads)
+ Object mon = new Object();
+ synchronized (mon) {
+ try {
+ mon.wait();
+ } catch (InterruptedException ex) {}
+ }
+ }
// --------------------------------------------------------------------------------------------
// Miscellaneous Utilities
@@ -1268,30 +1077,169 @@ public class TaskManager implements TaskOperationProtocol {
if (!f.exists()) {
throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' does not exist.");
}
-
if (!f.isDirectory()) {
throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not a directory.");
}
-
if (!f.canWrite()) {
throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not writable.");
}
}
}
- public static class EmergencyShutdownExceptionHandler implements Thread.UncaughtExceptionHandler {
+ /**
+ * Stops the given RPC protocol proxy, if it is not null.
+ * This method never throws an exception, it only logs errors.
+ *
+ * @param protocol The protocol proxy to stop.
+ */
+ private static final void stopProxy(VersionedProtocol protocol) {
+ if (protocol != null) {
+ try {
+ RPC.stopProxy(protocol);
+ }
+ catch (Throwable t) {
+ LOG.error("Error while shutting down RPC proxy.", t);
+ }
+ }
+ }
+
+ /**
+ * Determines the IP address of the interface from which the TaskManager can connect to the given JobManager
+ * IP address.
+ *
+ * @param jobManagerAddress The socket address to connect to.
+ * @return The IP address of the interface that connects to the JobManager.
+ * @throws IOException If no connection could be established.
+ */
+ private static InetAddress getTaskManagerAddress(InetSocketAddress jobManagerAddress) throws IOException {
+ AddressDetectionState strategy = AddressDetectionState.ADDRESS;
- private final TaskManager tm;
-
- public EmergencyShutdownExceptionHandler(TaskManager tm) {
- this.tm = tm;
+ while (true) {
+ Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
+ while (e.hasMoreElements()) {
+ NetworkInterface n = e.nextElement();
+ Enumeration<InetAddress> ee = n.getInetAddresses();
+ while (ee.hasMoreElements()) {
+ InetAddress i = ee.nextElement();
+ switch (strategy) {
+ case ADDRESS:
+ if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
+ if (tryToConnect(i, jobManagerAddress, strategy.getTimeout())) {
+ LOG.info("Determined " + i + " as the TaskTracker's own IP address");
+ return i;
+ }
+ }
+ break;
+ case FAST_CONNECT:
+ case SLOW_CONNECT:
+ boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout());
+ if (correct) {
+ LOG.info("Determined " + i + " as the TaskTracker's own IP address");
+ return i;
+ }
+ break;
+ default:
+ throw new RuntimeException("Unkown address detection strategy: " + strategy);
+ }
+ }
+ }
+ // state control
+ switch (strategy) {
+ case ADDRESS:
+ strategy = AddressDetectionState.FAST_CONNECT;
+ break;
+ case FAST_CONNECT:
+ strategy = AddressDetectionState.SLOW_CONNECT;
+ break;
+ case SLOW_CONNECT:
+ throw new RuntimeException("The TaskManager is unable to connect to the JobManager (Address: '"+jobManagerAddress+"').");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Defaulting to detection strategy " + strategy);
+ }
+ }
+ }
+
+ /**
+ * Searches for an available free port and returns the port number.
+ *
+ * @return An available port.
+ * @throws RuntimeException Thrown, if no free port was found.
+ */
+ private static final int getAvailablePort() {
+ for (int i = 0; i < 50; i++) {
+ ServerSocket serverSocket = null;
+ try {
+ serverSocket = new ServerSocket(0);
+ int port = serverSocket.getLocalPort();
+ if (port != 0) {
+ return port;
+ }
+ } catch (IOException e) {
+ LOG.debug("Unable to allocate port " + e.getMessage(), e);
+ } finally {
+ if (serverSocket != null) {
+ try { serverSocket.close(); } catch (Throwable t) {}
+ }
+ }
}
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- LOG.fatal("Thread " + t.getName() + " caused an unrecoverable exception.", e);
- tm.shutdown();
+ throw new RuntimeException("Could not find a free permitted port on the machine.");
+ }
+
+ /**
+ * Checks if two addresses have a common prefix (first 2 bytes).
+ * Example: 192.168.???.???
+ * Works also with ipv6, but accepts probably too many addresses
+ */
+ private static boolean hasCommonPrefix(byte[] address, byte[] address2) {
+ return address[0] == address2[0] && address[1] == address2[1];
+ }
+
+ private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket, int timeout) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to connect to JobManager (" + toSocket + ") from local address " + fromAddress
+ + " with timeout " + timeout);
+ }
+ boolean connectable = true;
+ Socket socket = null;
+ try {
+ socket = new Socket();
+ SocketAddress bindP = new InetSocketAddress(fromAddress, 0); // 0 = let the OS choose the port on this
+ // machine
+ socket.bind(bindP);
+ socket.connect(toSocket, timeout);
+ } catch (Exception ex) {
+ LOG.info("Failed to connect to JobManager from address '" + fromAddress + "': " + ex.getMessage());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed with exception", ex);
+ }
+ connectable = false;
+ } finally {
+ if (socket != null) {
+ socket.close();
+ }
}
+ return connectable;
+ }
+
+ /**
+ * The states of address detection mechanism.
+ * There is only a state transition if the current state failed to determine the address.
+ */
+ private enum AddressDetectionState {
+ ADDRESS(50), //detect own IP based on the JobManagers IP address. Look for common prefix
+ FAST_CONNECT(50), //try to connect to the JobManager on all Interfaces and all their addresses.
+ //this state uses a low timeout (say 50 ms) for fast detection.
+ SLOW_CONNECT(1000); //same as FAST_CONNECT, but with a timeout of 1000 ms (1s).
+
+ private int timeout;
+ AddressDetectionState(int timeout) {
+ this.timeout = timeout;
+ }
+ public int getTimeout() {
+ return timeout;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.java
new file mode 100644
index 0000000..f0f00a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.types.StringValue;
+
+import com.google.common.base.Preconditions;
+
+
+public class TaskOperationResult implements IOReadableWritable {
+
+ private JobVertexID vertexId;
+
+ private int subtaskIndex;
+
+ private ExecutionAttemptID executionId;
+
+ private boolean success;
+
+ private String description;
+
+
+ public TaskOperationResult() {
+ this(new JobVertexID(), -1, new ExecutionAttemptID(), false);
+ }
+
+ public TaskOperationResult(JobVertexID vertexId, int subtaskIndex, ExecutionAttemptID executionId, boolean success) {
+ this(vertexId, subtaskIndex, executionId, success, null);
+ }
+
+ public TaskOperationResult(JobVertexID vertexId, int subtaskIndex, ExecutionAttemptID executionId, boolean success, String description) {
+ Preconditions.checkNotNull(vertexId);
+ Preconditions.checkNotNull(executionId);
+
+ this.vertexId = vertexId;
+ this.subtaskIndex = subtaskIndex;
+ this.executionId = executionId;
+ this.success = success;
+ this.description = description;
+ }
+
+
+ public JobVertexID getVertexId() {
+ return vertexId;
+ }
+
+ public int getSubtaskIndex() {
+ return subtaskIndex;
+ }
+
+ public ExecutionAttemptID getExecutionId() {
+ return executionId;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.vertexId.read(in);
+ this.subtaskIndex = in.readInt();
+ this.success = in.readBoolean();
+
+ if (in.readBoolean()) {
+ this.description = StringValue.readString(in);
+ }
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ this.vertexId.write(out);
+ out.writeInt(subtaskIndex);
+ out.writeBoolean(success);
+
+ if (description != null) {
+ out.writeBoolean(true);
+ StringValue.writeString(description, out);
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskSubmissionResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskSubmissionResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskSubmissionResult.java
deleted file mode 100644
index ae88f3b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskSubmissionResult.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
-
-/**
- * A <code>TaskSubmissionResult</code> is used to report the results
- * of a task submission. It contains the ID of the submitted task, a return code and
- * a description. In case of a submission error the description includes an error message.
- *
- */
-public class TaskSubmissionResult extends AbstractTaskResult {
-
- /**
- * Constructs a new task submission result.
- *
- * @param vertexID
- * the task ID this result belongs to
- * @param returnCode
- * the return code of the submission
- */
- public TaskSubmissionResult(ExecutionVertexID vertexID, ReturnCode returnCode) {
- super(vertexID, returnCode);
- }
-
- /**
- * Constructs an empty task submission result.
- */
- public TaskSubmissionResult() {
- super();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnumUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnumUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnumUtils.java
index 9694f63..312c9ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnumUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnumUtils.java
@@ -16,18 +16,16 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.util;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.flink.core.io.StringRecord;
+import org.apache.flink.types.StringValue;
/**
* Auxiliary class to (de)serialize enumeration values.
- *
*/
public final class EnumUtils {
@@ -56,7 +54,7 @@ public final class EnumUtils {
return null;
}
- return T.valueOf(enumType, StringRecord.readString(in));
+ return T.valueOf(enumType, StringValue.readString(in));
}
/**
@@ -75,7 +73,7 @@ public final class EnumUtils {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
- StringRecord.writeString(out, enumVal.name());
+ StringValue.writeString(enumVal.name(), out);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
index 964a754..fa7ad1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
@@ -19,20 +19,46 @@
package org.apache.flink.runtime.util;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
public class ExecutorThreadFactory implements ThreadFactory {
+ private static final Log LOG = LogFactory.getLog(ExecutorThreadFactory.class);
+
+
+ private static final String THREAD_NAME_PREFIX = "Flink Executor Thread - ";
+
+ private static final AtomicInteger COUNTER = new AtomicInteger(1);
+
+ private static final ThreadGroup THREAD_GROUP = new ThreadGroup("Flink Executor Threads");
+
+ private static final Thread.UncaughtExceptionHandler EXCEPTION_HANDLER = new LoggingExceptionHander();
+
+
public static final ExecutorThreadFactory INSTANCE = new ExecutorThreadFactory();
-
- private static final String THREAD_NAME = "Flink Executor Thread";
+ // --------------------------------------------------------------------------------------------
private ExecutorThreadFactory() {}
public Thread newThread(Runnable target) {
- Thread t = new Thread(target, THREAD_NAME);
+ Thread t = new Thread(THREAD_GROUP, target, THREAD_NAME_PREFIX + COUNTER.getAndIncrement());
t.setDaemon(true);
+ t.setUncaughtExceptionHandler(EXCEPTION_HANDLER);
return t;
}
+
+ // --------------------------------------------------------------------------------------------
+
+ private static final class LoggingExceptionHander implements Thread.UncaughtExceptionHandler {
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.error("Thread '" + t.getName() + "' produced an uncaught exception.", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
index d4db568..5bb9451 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
@@ -110,45 +110,11 @@ public class SerializableArrayList<E extends IOReadableWritable> extends ArrayLi
@Override
- public boolean equals(final Object obj) {
-
+ public boolean equals(Object obj) {
if (!(obj instanceof SerializableArrayList<?>)) {
return false;
}
- final SerializableArrayList<?> sal = (SerializableArrayList<?>) obj;
-
- if (this.size() != sal.size()) {
- return false;
- }
-
- final Iterator<E> it = iterator();
- final Iterator<?> it2 = sal.iterator();
- while (it.hasNext()) {
-
- final E e = it.next();
- final Object obj2 = it2.next();
- if (!e.equals(obj2)) {
- return false;
- }
- }
-
- return true;
- }
-
-
- @Override
- public int hashCode() {
-
- int hashCode = Integer.MIN_VALUE;
-
- if (!isEmpty()) {
- final E e = get(0);
- hashCode += Math.abs(e.getClass().hashCode());
- }
-
- hashCode += size();
-
- return hashCode;
+ return (obj instanceof SerializableArrayList) && super.equals(obj);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobResultTest.java
index bc3173f..9ae6e2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobResultTest.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.client;
import static org.junit.Assert.assertEquals;
@@ -36,7 +35,6 @@ import org.junit.Test;
/**
* This class contains test concerning all classes which are derived from {@link AbstractJobResult}.
- *
*/
public class JobResultTest {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptorTest.java
index d0b1504..ffbcf41 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptorTest.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.deployment;
import static org.junit.Assert.assertEquals;
@@ -30,10 +29,7 @@ import org.apache.flink.runtime.testutils.ServerTestUtils;
import org.apache.flink.util.StringUtils;
import org.junit.Test;
-/**
- * This class contains unit tests for the {@link ChannelDeploymentDescriptor} class.
- *
- */
+
public class ChannelDeploymentDescriptorTest {
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptorTest.java
deleted file mode 100644
index 68d285a..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptorTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.deployment;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.io.network.gates.GateID;
-import org.apache.flink.runtime.testutils.ServerTestUtils;
-import org.apache.flink.util.StringUtils;
-import org.junit.Test;
-
-/**
- * This class contains unit tests for the {@link GateDeploymentDescriptor} class.
- *
- */
-public class GateDeploymentDescriptorTest {
-
- /**
- * Tests the constructor of the {@link GateDeploymentDescriptor} class with valid arguments.
- */
- @Test
- public void testConstructorWithValidArguments() {
-
- final GateID gateID = new GateID();
- final ChannelType channelType = ChannelType.IN_MEMORY;
- final List<ChannelDeploymentDescriptor> channels = new ArrayList<ChannelDeploymentDescriptor>(0);
-
- final GateDeploymentDescriptor gdd = new GateDeploymentDescriptor(gateID, channelType, channels);
-
- assertEquals(gateID, gdd.getGateID());
- assertEquals(channelType, gdd.getChannelType());
- assertEquals(channels.size(), gdd.getNumberOfChannelDescriptors());
- }
-
- /**
- * Tests the constructor of the {@link GateDeploymentDescriptor} class with valid arguments.
- */
- @Test
- public void testConstructorWithInvalidArguments() {
-
- final GateID gateID = new GateID();
- final ChannelType channelType = ChannelType.IN_MEMORY;
- final List<ChannelDeploymentDescriptor> channels = new ArrayList<ChannelDeploymentDescriptor>(0);
-
- boolean firstExceptionCaught = false;
- boolean secondExceptionCaught = false;
- boolean thirdExceptionCaught = false;
-
- try {
- new GateDeploymentDescriptor(null, channelType, channels);
- } catch (IllegalArgumentException e) {
- firstExceptionCaught = true;
- }
-
- try {
- new GateDeploymentDescriptor(gateID, null, channels);
- } catch (IllegalArgumentException e) {
- secondExceptionCaught = true;
- }
-
- try {
- new GateDeploymentDescriptor(gateID, channelType, null);
- } catch (IllegalArgumentException e) {
- thirdExceptionCaught = true;
- }
-
- if (!firstExceptionCaught) {
- fail("First argument was illegal but not detected");
- }
-
- if (!secondExceptionCaught) {
- fail("Second argument was illegal but not detected");
- }
-
-
- if (!thirdExceptionCaught) {
- fail("Third argument was illegal but not detected");
- }
- }
-
- /**
- * Tests the serialization/deserialization of the {@link GateDeploymentDescriptor} class.
- */
- @Test
- public void testSerialization() {
-
- final GateID gateID = new GateID();
- final ChannelType channelType = ChannelType.IN_MEMORY;
- final List<ChannelDeploymentDescriptor> channels = new ArrayList<ChannelDeploymentDescriptor>(0);
- final ChannelDeploymentDescriptor cdd = new ChannelDeploymentDescriptor(new ChannelID(), new ChannelID());
- channels.add(cdd);
-
- final GateDeploymentDescriptor orig = new GateDeploymentDescriptor(gateID, channelType,
- channels);
-
- GateDeploymentDescriptor copy = null;
-
- try {
- copy = ServerTestUtils.createCopy(orig);
- } catch (IOException ioe) {
- fail(StringUtils.stringifyException(ioe));
- }
-
- assertFalse(orig.getGateID() == copy.getGateID());
-
- assertEquals(orig.getGateID(), copy.getGateID());
- assertEquals(orig.getChannelType(), copy.getChannelType());
- assertEquals(orig.getNumberOfChannelDescriptors(), copy.getNumberOfChannelDescriptors());
- assertEquals(orig.getChannelDescriptor(0).getOutputChannelID(), copy.getChannelDescriptor(0)
- .getOutputChannelID());
- assertEquals(orig.getChannelDescriptor(0).getInputChannelID(), copy.getChannelDescriptor(0).getInputChannelID());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 3a1aa40..1f66f93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -16,275 +16,64 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.deployment;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.RegularPactTask;
-import org.apache.flink.runtime.testutils.ServerTestUtils;
-import org.apache.flink.runtime.util.SerializableArrayList;
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.junit.Test;
-/**
- * This class contains unit tests for the {@link TaskDeploymentDescriptor} class.
- *
- */
public class TaskDeploymentDescriptorTest {
- /**
- * Tests the constructor of the {@link TaskDeploymentDescriptor} class with valid arguments.
- */
- @Test
- public void testConstructorWithValidArguments() {
-
- final JobID jobID = new JobID();
- final ExecutionVertexID vertexID = new ExecutionVertexID();
- final String taskName = "task name";
- final int indexInSubtaskGroup = 0;
- final int currentNumberOfSubtasks = 1;
- final Configuration jobConfiguration = new Configuration();
- final Configuration taskConfiguration = new Configuration();
- final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
- final SerializableArrayList<GateDeploymentDescriptor> outputGates = new SerializableArrayList<GateDeploymentDescriptor>(
- 0);
- final SerializableArrayList<GateDeploymentDescriptor> inputGates = new SerializableArrayList<GateDeploymentDescriptor>(
- 0);
-
- final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jobID, vertexID, taskName,
- indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
- invokableClass, outputGates, inputGates);
- assertEquals(jobID, tdd.getJobID());
- assertEquals(vertexID, tdd.getVertexID());
- assertEquals(taskName, tdd.getTaskName());
- assertEquals(indexInSubtaskGroup, tdd.getIndexInSubtaskGroup());
- assertEquals(currentNumberOfSubtasks, tdd.getCurrentNumberOfSubtasks());
- assertEquals(jobConfiguration, tdd.getJobConfiguration());
- assertEquals(taskConfiguration, tdd.getTaskConfiguration());
- assertEquals(invokableClass, tdd.getInvokableClass());
- assertEquals(outputGates.size(), tdd.getNumberOfOutputGateDescriptors());
- assertEquals(inputGates.size(), tdd.getNumberOfInputGateDescriptors());
- }
-
- /**
- * Tests the constructor of the {@link GateDeploymentDescriptor} class with valid arguments.
- */
- @Test
- public void testConstructorWithInvalidArguments() {
-
- final JobID jobID = new JobID();
- final ExecutionVertexID vertexID = new ExecutionVertexID();
- final String taskName = "task name";
- final int indexInSubtaskGroup = 0;
- final int currentNumberOfSubtasks = 1;
- final Configuration jobConfiguration = new Configuration();
- final Configuration taskConfiguration = new Configuration();
- final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
- final SerializableArrayList<GateDeploymentDescriptor> outputGates = new SerializableArrayList<GateDeploymentDescriptor>(
- 0);
- final SerializableArrayList<GateDeploymentDescriptor> inputGates = new SerializableArrayList<GateDeploymentDescriptor>(
- 0);
-
- boolean firstExceptionCaught = false;
- boolean secondExceptionCaught = false;
- boolean thirdExceptionCaught = false;
- boolean forthExceptionCaught = false;
- boolean fifthExceptionCaught = false;
- boolean sixthExceptionCaught = false;
- boolean seventhExceptionCaught = false;
- boolean eighthExceptionCaught = false;
- boolean ninethExeceptionCaught = false;
- boolean tenthExceptionCaught = false;
-
- try {
- new TaskDeploymentDescriptor(null, vertexID, taskName,
- indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
- invokableClass, outputGates, inputGates);
- } catch (IllegalArgumentException e) {
- firstExceptionCaught = true;
- }
-
- try {
- new TaskDeploymentDescriptor(jobID, null, taskName,
- indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
- invokableClass, outputGates, inputGates);
- } catch (IllegalArgumentException e) {
- secondExceptionCaught = true;
- }
-
- try {
- new TaskDeploymentDescriptor(jobID, vertexID, null,
- indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
- invokableClass, outputGates, inputGates);
- } catch (IllegalArgumentException e) {
- thirdExceptionCaught = true;
- }
-
- try {
- new TaskDeploymentDescriptor(jobID, vertexID, taskName,
- -1, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
- invokableClass, outputGates, inputGates);
- } catch (IllegalArgumentException e) {
- forthExceptionCaught = true;
- }
-
- try {
- new TaskDeploymentDescriptor(jobID, vertexID, taskName,
- indexInSubtaskGroup, -1, jobConfiguration, taskConfiguration,
- invokableClass, outputGates, inputGates);
- } catch (IllegalArgumentException e) {
- fifthExceptionCaught = true;
- }
-
- try {
- new TaskDeploymentDescriptor(jobID, vertexID, taskName,
- indexInSubtaskGroup, currentNumberOfSubtasks, null, taskConfiguration,
- invokableClass, outputGates, inputGates);
- } catch (IllegalArgumentException e) {
- sixthExceptionCaught = true;
- }
-
- try {
- new TaskDeploymentDescriptor(jobID, vertexID, taskName,
- indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, null,
- invokableClass, outputGates, inputGates);
- } catch (IllegalArgumentException e) {
- seventhExceptionCaught = true;
- }
-
- try {
- new TaskDeploymentDescriptor(jobID, vertexID, taskName,
- indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
- null, outputGates, inputGates);
- } catch (IllegalArgumentException e) {
- eighthExceptionCaught = true;
-
- }
-
- try {
- new TaskDeploymentDescriptor(jobID, vertexID, taskName,
- indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
- invokableClass, null, inputGates);
- } catch (IllegalArgumentException e) {
- ninethExeceptionCaught = true;
-
- }
-
- try {
- new TaskDeploymentDescriptor(jobID, vertexID, taskName,
- indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
- invokableClass, outputGates, null);
- } catch (IllegalArgumentException e) {
- tenthExceptionCaught = true;
- }
-
- if (!firstExceptionCaught) {
- fail("First argument was illegal but not detected");
- }
-
- if (!secondExceptionCaught) {
- fail("Second argument was illegal but not detected");
- }
-
- if (!thirdExceptionCaught) {
- fail("Third argument was illegal but not detected");
- }
-
- if (!forthExceptionCaught) {
- fail("Forth argument was illegal but not detected");
- }
-
- if (!fifthExceptionCaught) {
- fail("Fifth argument was illegal but not detected");
- }
-
- if (!sixthExceptionCaught) {
- fail("Sixth argument was illegal but not detected");
- }
-
- if (!seventhExceptionCaught) {
- fail("Seventh argument was illegal but not detected");
- }
-
- if (!eighthExceptionCaught) {
- fail("Eighth argument was illegal but not detected");
- }
-
- if (!ninethExeceptionCaught) {
- fail("Nineth argument was illegal but not detected");
- }
-
- if (!tenthExceptionCaught) {
- fail("Tenth argument was illegal but not detected");
- }
-
- }
-
- /**
- * Tests the serialization/deserialization of the {@link TaskDeploymentDescriptor} class.
- */
@Test
public void testSerialization() {
-
- final JobID jobID = new JobID();
- final ExecutionVertexID vertexID = new ExecutionVertexID();
- final String taskName = "task name";
- final int indexInSubtaskGroup = 0;
- final int currentNumberOfSubtasks = 1;
- final Configuration jobConfiguration = new Configuration();
- final Configuration taskConfiguration = new Configuration();
- final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
- final SerializableArrayList<GateDeploymentDescriptor> outputGates = new SerializableArrayList<GateDeploymentDescriptor>(
- 0);
- final SerializableArrayList<GateDeploymentDescriptor> inputGates = new SerializableArrayList<GateDeploymentDescriptor>(
- 0);
-
- final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, vertexID, taskName,
- indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
- invokableClass, outputGates, inputGates);
-
- TaskDeploymentDescriptor copy = null;
-
- try {
- LibraryCacheManager.register(jobID, new String[] {});
- } catch (IOException ioe) {
- fail(StringUtils.stringifyException(ioe));
- }
-
- try {
- copy = ServerTestUtils.createCopy(orig);
- } catch (IOException ioe) {
- fail(StringUtils.stringifyException(ioe));
- }
-
- assertFalse(orig.getJobID() == copy.getJobID());
- assertFalse(orig.getVertexID() == copy.getVertexID());
- assertFalse(orig.getTaskName() == copy.getTaskName());
- assertFalse(orig.getJobConfiguration() == copy.getJobConfiguration());
- assertFalse(orig.getTaskConfiguration() == copy.getTaskConfiguration());
-
- assertEquals(orig.getJobID(), copy.getJobID());
- assertEquals(orig.getVertexID(), copy.getVertexID());
- assertEquals(orig.getTaskName(), copy.getTaskName());
- assertEquals(orig.getIndexInSubtaskGroup(), copy.getIndexInSubtaskGroup());
- assertEquals(orig.getCurrentNumberOfSubtasks(), copy.getCurrentNumberOfSubtasks());
- assertEquals(orig.getNumberOfOutputGateDescriptors(), copy.getNumberOfOutputGateDescriptors());
- assertEquals(orig.getNumberOfInputGateDescriptors(), copy.getNumberOfInputGateDescriptors());
-
try {
- LibraryCacheManager.register(jobID, new String[] {});
- } catch (IOException ioe) {
- fail(StringUtils.stringifyException(ioe));
+ final JobID jobID = new JobID();
+ final JobVertexID vertexID = new JobVertexID();
+ final ExecutionAttemptID execId = new ExecutionAttemptID();
+ final String taskName = "task name";
+ final int indexInSubtaskGroup = 0;
+ final int currentNumberOfSubtasks = 1;
+ final Configuration jobConfiguration = new Configuration();
+ final Configuration taskConfiguration = new Configuration();
+ final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
+ final List<GateDeploymentDescriptor> outputGates = new ArrayList<GateDeploymentDescriptor>(0);
+ final List<GateDeploymentDescriptor> inputGates = new ArrayList<GateDeploymentDescriptor>(0);
+
+ final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, vertexID, execId, taskName,
+ indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
+ invokableClass.getName(), outputGates, inputGates, new String[] { "jar1", "jar2" }, 47);
+
+ final TaskDeploymentDescriptor copy = CommonTestUtils.createCopyWritable(orig);
+
+ assertFalse(orig.getJobID() == copy.getJobID());
+ assertFalse(orig.getVertexID() == copy.getVertexID());
+ assertFalse(orig.getTaskName() == copy.getTaskName());
+ assertFalse(orig.getJobConfiguration() == copy.getJobConfiguration());
+ assertFalse(orig.getTaskConfiguration() == copy.getTaskConfiguration());
+
+ assertEquals(orig.getJobID(), copy.getJobID());
+ assertEquals(orig.getVertexID(), copy.getVertexID());
+ assertEquals(orig.getTaskName(), copy.getTaskName());
+ assertEquals(orig.getIndexInSubtaskGroup(), copy.getIndexInSubtaskGroup());
+ assertEquals(orig.getCurrentNumberOfSubtasks(), copy.getCurrentNumberOfSubtasks());
+ assertEquals(orig.getOutputGates(), copy.getOutputGates());
+ assertEquals(orig.getInputGates(), copy.getInputGates());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
}
}
}