You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/04/16 20:29:36 UTC
svn commit: r765713 - in /hadoop/core/trunk: ./
src/core/org/apache/hadoop/filecache/ src/core/org/apache/hadoop/fs/
src/docs/src/documentation/content/xdocs/ src/mapred/
src/mapred/org/apache/hadoop/mapred/
Author: yhemanth
Date: Thu Apr 16 18:29:35 2009
New Revision: 765713
URL: http://svn.apache.org/viewvc?rev=765713&view=rev
Log:
HADOOP-4490. Provide ability to run tasks as job owners. Contributed by Sreekanth Ramakrishnan.
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskController.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java
hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileUtil.java
hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml
hadoop/core/trunk/src/mapred/mapred-default.xml
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Apr 16 18:29:35 2009
@@ -237,6 +237,9 @@
without having to restart the daemon.
(Sreekanth Ramakrishnan and Vinod Kumar Vavilapalli via yhemanth)
+ HADOOP-4490. Provide ability to run tasks as job owners.
+ (Sreekanth Ramakrishnan via yhemanth)
+
OPTIMIZATIONS
HADOOP-5595. NameNode does not need to run a replicator to choose a
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java Thu Apr 16 18:29:35 2009
@@ -428,7 +428,8 @@
// do chmod here
try {
- FileUtil.chmod(parchive.toString(), "+x");
+ //Setting recursive permission to grant everyone read and execute
+ FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
} catch(InterruptedException e) {
LOG.warn("Exception in chmod" + e.toString());
}
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileUtil.java?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileUtil.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileUtil.java Thu Apr 16 18:29:35 2009
@@ -27,6 +27,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.mortbay.log.Log;
/**
* A collection of file-processing util methods
@@ -706,12 +707,42 @@
*/
public static int chmod(String filename, String perm
) throws IOException, InterruptedException {
- String cmd = "chmod " + perm + " " + filename;
- Process p = Runtime.getRuntime().exec(cmd, null);
- return p.waitFor();
+ return chmod(filename, perm, false);
}
/**
+ * Change the permissions on a file / directory, recursively, if
+ * needed.
+ * @param filename name of the file whose permissions are to change
+ * @param perm permission string
+ * @param recursive true, if permissions should be changed recursively
+ * @return the exit code from the command.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public static int chmod(String filename, String perm, boolean recursive)
+ throws IOException, InterruptedException {
+ StringBuffer cmdBuf = new StringBuffer();
+ cmdBuf.append("chmod ");
+ if (recursive) {
+ cmdBuf.append("-R ");
+ }
+ cmdBuf.append(perm).append(" ");
+ cmdBuf.append(filename);
+ String[] shellCmd = {"bash", "-c" ,cmdBuf.toString()};
+ ShellCommandExecutor shExec = new ShellCommandExecutor(shellCmd);
+ try {
+ shExec.execute();
+ }catch(Exception e) {
+ if(Log.isDebugEnabled()) {
+ Log.debug("Error while changing permission : " + filename
+ +" Exception: " + StringUtils.stringifyException(e));
+ }
+ }
+ return shExec.getExitCode();
+ }
+
+ /**
* Create a tmp file for a base file.
* @param basefile the base file of the tmp
* @param prefix file name prefix of tmp
Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml Thu Apr 16 18:29:35 2009
@@ -474,6 +474,122 @@
</ul>
</section>
+ <section>
+ <title>Task Controllers</title>
+ <p>Task controllers are classes in the Hadoop Map/Reduce
+ framework that define how user's map and reduce tasks
+ are launched and controlled. They can
+ be used in clusters that require some customization in
+ the process of launching or controlling the user tasks.
+ For example, in some
+ clusters, there may be a requirement to run tasks as
+ the user who submitted the job, instead of as the task
+ tracker user, which is how tasks are launched by default.
+ This section describes how to configure and use
+ task controllers.</p>
+ <p>The following task controllers are the available in
+ Hadoop.
+ </p>
+ <table>
+ <tr><th>Name</th><th>Class Name</th><th>Description</th></tr>
+ <tr>
+ <td>DefaultTaskController</td>
+ <td>org.apache.hadoop.mapred.DefaultTaskController</td>
+ <td> The default task controller which Hadoop uses to manage task
+ execution. The tasks run as the task tracker user.</td>
+ </tr>
+ <tr>
+ <td>LinuxTaskController</td>
+ <td>org.apache.hadoop.mapred.LinuxTaskController</td>
+ <td>This task controller, which is supported only on Linux,
+ runs the tasks as the user who submitted the job. It requires
+ these user accounts to be created on the cluster nodes
+ where the tasks are launched. It
+ uses a setuid executable that is included in the Hadoop
+ distribution. The task tracker uses this executable to
+ launch and kill tasks. The setuid executable switches to
+ the user who has submitted the job and launches or kills
+ the tasks. Currently, this task controller
+ opens up permissions to local files and directories used
+ by the tasks such as the job jar files, distributed archive
+ files, intermediate files and task log files. In future,
+ it is expected that stricter file permissions are used.
+ </td>
+ </tr>
+ </table>
+ <section>
+ <title>Configuring Task Controllers</title>
+ <p>The task controller to be used can be configured by setting the
+ value of the following key in mapred-site.xml</p>
+ <table>
+ <tr>
+ <th>Property</th><th>Value</th><th>Notes</th>
+ </tr>
+ <tr>
+ <td>mapred.task.tracker.task-controller</td>
+ <td>Fully qualified class name of the task controller class</td>
+ <td>Currently there are two implementations of task controller
+ in the Hadoop system, DefaultTaskController and LinuxTaskController.
+ Refer to the class names mentioned above to determine the value
+ to set for the class of choice.
+ </td>
+ </tr>
+ </table>
+ </section>
+ <section>
+ <title>Using the LinuxTaskController</title>
+ <p>This section of the document describes the steps required to
+ use the LinuxTaskController.</p>
+
+ <p>In order to use the LinuxTaskController, a setuid executable
+ should be built and deployed on the compute nodes. The
+ executable is named task-controller. To build the executable,
+ execute
+ <em>ant task-controller -Dhadoop.conf.dir=/path/to/conf/dir.
+ </em>
+ The path passed in <em>-Dhadoop.conf.dir</em> should be the path
+ on the cluster nodes where a configuration file for the setuid
+ executable would be located. The executable would be built to
+ <em>build.dir/dist.dir/bin</em> and should be installed to
+ <em>$HADOOP_HOME/bin</em>.
+ </p>
+
+ <p>
+ The executable must be deployed as a setuid executable, by changing
+ the ownership to <em>root</em> and giving it permissions <em>4755</em>.
+ </p>
+
+ <p>The executable requires a configuration file called
+ <em>taskcontroller.cfg</em> to be
+ present in the configuration directory passed to the ant target
+ mentioned above. If the binary was not built with a specific
+ conf directory, the path defaults to <em>/path-to-binary/../conf</em>.
+ </p>
+
+ <p>The executable requires following configuration items to be
+ present in the <em>taskcontroller.cfg</em> file. The items should
+ be mentioned as simple <em>key=value</em> pairs.
+ </p>
+ <table><tr><th>Name</th><th>Description</th></tr>
+ <tr>
+ <td>mapred.local.dir</td>
+ <td>Path to mapred local directories. Should be same as the value
+ which was provided to key in mapred-site.xml. This is required to
+ validate paths passed to the setuid executable in order to prevent
+ arbitrary paths being passed to it.</td>
+ </tr>
+ </table>
+
+ <p>
+ The LinuxTaskController requires that paths leading up to
+ the directories specified in
+ <em>mapred.local.dir</em> and <em>hadoop.log.dir</em> to be 755
+ and directories themselves having 777 permissions.
+ </p>
+ </section>
+
+ </section>
+
</section>
<section>
Modified: hadoop/core/trunk/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/mapred-default.xml?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/mapred-default.xml (original)
+++ hadoop/core/trunk/src/mapred/mapred-default.xml Thu Apr 16 18:29:35 2009
@@ -970,4 +970,11 @@
</description>
</property>
+<property>
+ <name>mapred.task.tracker.task-controller</name>
+ <value>org.apache.hadoop.mapred.DefaultTaskController</value>
+ <description>TaskController which is used to launch and manage task execution
+ </description>
+</property>
+
</configuration>
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=765713&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Thu Apr 16 18:29:35 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The default implementation for controlling tasks.
+ *
+ * This class provides an implementation for launching and killing
+ * tasks that need to be run as the tasktracker itself. Hence,
+ * many of the initializing or cleanup methods are not required here.
+ */
+class DefaultTaskController extends TaskController {
+
+ private static final Log LOG =
+ LogFactory.getLog(DefaultTaskController.class);
+ /**
+ * Launch a new JVM for the task.
+ *
+ * This method launches the new JVM for the task by executing the
+ * the JVM command using the {@link Shell.ShellCommandExecutor}
+ */
+ void launchTaskJVM(TaskController.TaskControllerContext context)
+ throws IOException {
+ JvmEnv env = context.env;
+ List<String> wrappedCommand =
+ TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
+ env.logSize, true, env.pidFile);
+ ShellCommandExecutor shexec =
+ new ShellCommandExecutor(wrappedCommand.toArray(new String[0]),
+ env.workDir, env.env);
+ // set the ShellCommandExecutor for later use.
+ context.shExec = shexec;
+ shexec.execute();
+ }
+
+ /**
+ * Kills the JVM running the task stored in the context.
+ *
+ * @param context the context storing the task running within the JVM
+ * that needs to be killed.
+ */
+ void killTaskJVM(TaskController.TaskControllerContext context) {
+ ShellCommandExecutor shexec = context.shExec;
+ JvmEnv env = context.env;
+ if (shexec != null) {
+ Process process = shexec.getProcess();
+ if (process != null) {
+ if (Shell.WINDOWS) {
+ process.destroy();
+ }
+ else {
+ Path pidFilePath = new Path(env.pidFile);
+ String pid = ProcessTree.getPidFromPidFile(
+ pidFilePath.toString());
+ if (pid != null) {
+ long sleeptimeBeforeSigkill = env.conf.getLong(
+ "mapred.tasktracker.tasks.sleeptime-before-sigkill",
+ ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+
+ ProcessTree.destroy(pid, sleeptimeBeforeSigkill,
+ ProcessTree.isSetsidAvailable, false);
+ try {
+ LOG.info("Process exited with exit code:" + process.waitFor());
+ } catch (InterruptedException ie) {}
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Initialize the task environment.
+ *
+ * Since tasks are launched as the tasktracker user itself, this
+ * method has no action to perform.
+ */
+ void initializeTask(TaskController.TaskControllerContext context) {
+ // The default task controller does not need to set up
+ // any permissions for proper execution.
+ // So this is a dummy method.
+ return;
+ }
+
+
+ @Override
+ void setup() {
+ // nothing to setup
+ return;
+ }
+
+ /*
+ * No need to do anything as we don't need to do as we dont need anything
+ * extra from what TaskTracker has done.
+ */
+ @Override
+ void initializeJob(JobID jobId) {
+ }
+
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java Thu Apr 16 18:29:35 2009
@@ -32,6 +32,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -54,9 +55,9 @@
public JvmManager(TaskTracker tracker) {
mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(),
- true);
+ true, tracker);
reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
- false);
+ false, tracker);
}
public void stop() {
@@ -124,11 +125,15 @@
int maxJvms;
boolean isMap;
+ TaskTracker tracker;
+
Random rand = new Random(System.currentTimeMillis());
- public JvmManagerForType(int maxJvms, boolean isMap) {
+ public JvmManagerForType(int maxJvms, boolean isMap,
+ TaskTracker tracker) {
this.maxJvms = maxJvms;
this.isMap = isMap;
+ this.tracker = tracker;
}
synchronized public void setRunningTaskForJvm(JVMId jvmId,
@@ -140,7 +145,23 @@
synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) {
if (jvmToRunningTask.containsKey(jvmId)) {
- return jvmToRunningTask.get(jvmId).getTaskInProgress();
+ //Incase of JVM reuse, tasks are returned to previously launched
+ //JVM via this method. However when a new task is launched
+ //the task being returned has to be initialized.
+ TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
+ JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
+ Task task = taskRunner.getTaskInProgress().getTask();
+ TaskControllerContext context =
+ new TaskController.TaskControllerContext();
+ context.env = jvmRunner.env;
+ context.task = task;
+ //If we are returning the same task as which the JVM was launched
+ //we don't initialize task once again.
+ if(!jvmRunner.env.conf.get("mapred.task.id").
+ equals(task.getTaskID().toString())) {
+ tracker.getTaskController().initializeTask(context);
+ }
+ return taskRunner.getTaskInProgress();
}
return null;
}
@@ -254,7 +275,7 @@
}
//*MUST* never reach this
throw new RuntimeException("Inconsistent state!!! " +
- "JVM Manager reached an unstable state " +
+ "JVM Manager reached an unstable state " +
"while reaping a JVM for task: " + t.getTask().getTaskID()+
" " + getDetails());
}
@@ -317,6 +338,9 @@
JVMId jvmId;
volatile boolean busy = true;
private ShellCommandExecutor shexec; // shell terminal for running the task
+ //context used for starting JVM
+ private TaskControllerContext initalContext;
+
public JvmRunner(JvmEnv env, JobID jobId) {
this.env = env;
this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
@@ -328,18 +352,19 @@
}
public void runChild(JvmEnv env) {
+ initalContext = new TaskControllerContext();
try {
env.vargs.add(Integer.toString(jvmId.getId()));
- List<String> wrappedCommand =
- TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
- env.logSize, true, env.pidFile);
- shexec = new ShellCommandExecutor(wrappedCommand.toArray(new String[0]),
- env.workDir, env.env);
- shexec.execute();
+ //Launch the task controller to run task JVM
+ initalContext.task = jvmToRunningTask.get(jvmId).getTask();
+ initalContext.env = env;
+ tracker.getTaskController().initializeTask(initalContext);
+ tracker.getTaskController().launchTaskJVM(initalContext);
} catch (IOException ioe) {
// do nothing
// error and output are appropriately redirected
} finally { // handle the exit code
+ shexec = initalContext.shExec;
if (shexec == null) {
return;
}
@@ -364,29 +389,14 @@
* of processes) is created using setsid.
*/
public void kill() {
- if (shexec != null) {
- Process process = shexec.getProcess();
- if (process != null) {
- if (Shell.WINDOWS) {
- process.destroy();
- }
- else {
- Path pidFilePath = new Path(env.pidFile);
- String pid = ProcessTree.getPidFromPidFile(
- pidFilePath.toString());
- if (pid != null) {
- long sleeptimeBeforeSigkill = env.conf.getLong(
- "mapred.tasktracker.tasks.sleeptime-before-sigkill",
- ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
-
- ProcessTree.destroy(pid, sleeptimeBeforeSigkill,
- ProcessTree.isSetsidAvailable, false);
- try {
- LOG.info("Process exited with exit code:" + process.waitFor());
- } catch (InterruptedException ie) {}
- }
- }
- }
+ TaskController controller = tracker.getTaskController();
+ //Check inital context before issuing a kill to prevent situations
+ //where kill is issued before task is launched.
+ if(initalContext != null && initalContext.env != null) {
+ controller.killTaskJVM(initalContext);
+ } else {
+ LOG.info(String.format("JVM Not killed %s but just removed",
+ jvmId.toString()));
}
removeJvm(jvmId);
}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=765713&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Thu Apr 16 18:29:35 2009
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * A {@link TaskController} that runs the task JVMs as the user
+ * who submits the job.
+ *
+ * This class executes a setuid executable to implement methods
+ * of the {@link TaskController}, including launching the task
+ * JVM and killing it when needed, and also initializing and
+ * finalizing the task environment.
+ * <p> The setuid executable is launched using the command line:</p>
+ * <p>task-controller user-name command command-args, where</p>
+ * <p>user-name is the name of the owner who submits the job</p>
+ * <p>command is one of the cardinal value of the
+ * {@link LinuxTaskController.TaskCommands} enumeration</p>
+ * <p>command-args depends on the command being launched.</p>
+ *
+ * In addition to running and killing tasks, the class also
+ * sets up appropriate access for the directories and files
+ * that will be used by the tasks.
+ */
+class LinuxTaskController extends TaskController {
+
+ private static final Log LOG =
+ LogFactory.getLog(LinuxTaskController.class);
+
+ // Name of the executable script that will contain the child
+ // JVM command line. See writeCommand for details.
+ private static final String COMMAND_FILE = "taskjvm.sh";
+
+ // Path to the setuid executable.
+ private static String taskControllerExe;
+
+ static {
+ // the task-controller is expected to be under the $HADOOP_HOME/bin
+ // directory.
+ File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin");
+ taskControllerExe =
+ new File(hadoopBin, "task-controller").getAbsolutePath();
+ }
+
+ // The list of directory paths specified in the
+ // variable mapred.local.dir. This is used to determine
+ // which among the list of directories is picked up
+ // for storing data for a particular task.
+ private String[] mapredLocalDirs;
+
+ // permissions to set on files and directories created.
+ // When localized files are handled securely, this string
+ // will change to something more restrictive. Until then,
+ // it opens up the permissions for all, so that the tasktracker
+ // and job owners can access files together.
+ private static final String FILE_PERMISSIONS = "ugo+rwx";
+
+ // permissions to set on components of the path leading to
+ // localized files and directories. Read and execute permissions
+ // are required for different users to be able to access the
+ // files.
+ private static final String PATH_PERMISSIONS = "go+rx";
+
+ public LinuxTaskController() {
+ super();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ mapredLocalDirs = conf.getStrings("mapred.local.dir");
+ //Setting of the permissions of the local directory is done in
+ //setup()
+ }
+
+ /**
+ * List of commands that the setuid script will execute.
+ */
+ enum TaskCommands {
+ LAUNCH_TASK_JVM,
+ KILL_TASK_JVM
+ }
+
+ /**
+ * Launch a task JVM that will run as the owner of the job.
+ *
+ * This method launches a task JVM by executing a setuid
+ * executable that will switch to the user and run the
+ * task.
+ */
+ @Override
+ void launchTaskJVM(TaskController.TaskControllerContext context)
+ throws IOException {
+ JvmEnv env = context.env;
+ // get the JVM command line.
+ String cmdLine =
+ TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
+ env.logSize, true, env.pidFile);
+
+ // write the command to a file in the
+ // task specific cache directory
+ writeCommand(cmdLine, getTaskCacheDirectory(context));
+
+ // Call the taskcontroller with the right parameters.
+ List<String> launchTaskJVMArgs = buildTaskCommandArgs(context);
+ ShellCommandExecutor shExec = buildTaskControllerExecutor(
+ TaskCommands.LAUNCH_TASK_JVM,
+ env.conf.getUser(),
+ launchTaskJVMArgs, env);
+ context.shExec = shExec;
+ shExec.execute();
+ LOG.debug("output after executing task jvm = " + shExec.getOutput());
+ }
+
+ // convenience API for building command arguments for specific commands
+ private List<String> buildTaskCommandArgs(TaskControllerContext context) {
+ List<String> commandArgs = new ArrayList<String>(3);
+ String taskId = context.task.getTaskID().toString();
+ String jobId = getJobId(context);
+ commandArgs.add(jobId);
+ if(!context.task.isTaskCleanupTask()) {
+ commandArgs.add(taskId);
+ }else {
+ commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
+ }
+
+ LOG.debug("getting the task directory as: "
+ + getTaskCacheDirectory(context));
+ commandArgs.add(getDirectoryChosenForTask(
+ new File(getTaskCacheDirectory(context)),
+ context));
+ return commandArgs;
+ }
+
+ // get the Job ID from the information in the TaskControllerContext
+ private String getJobId(TaskControllerContext context) {
+ String taskId = context.task.getTaskID().toString();
+ TaskAttemptID tId = TaskAttemptID.forName(taskId);
+ String jobId = tId.getJobID().toString();
+ return jobId;
+ }
+
+ // Get the directory from the list of directories configured
+ // in mapred.local.dir chosen for storing data pertaining to
+ // this task.
+ private String getDirectoryChosenForTask(File directory,
+ TaskControllerContext context) {
+ String jobId = getJobId(context);
+ String taskId = context.task.getTaskID().toString();
+ for (String dir : mapredLocalDirs) {
+ File mapredDir = new File(dir);
+ File taskDir = new File(mapredDir, TaskTracker.getLocalTaskDir(
+ jobId, taskId, context.task.isTaskCleanupTask()));
+ if (directory.equals(taskDir)) {
+ return dir;
+ }
+ }
+
+ LOG.error("Couldn't parse task cache directory correctly");
+ throw new IllegalArgumentException("invalid task cache directory "
+ + directory.getAbsolutePath());
+ }
+
+ /**
+ * Kill a launched task JVM running as the user of the job.
+ *
+ * This method will launch the task controller setuid executable
+ * that in turn will kill the task JVM by sending a kill signal.
+ */
+ void killTaskJVM(TaskControllerContext context) {
+
+ if(context.task == null) {
+ LOG.info("Context task null not killing the JVM");
+ return;
+ }
+
+ JvmEnv env = context.env;
+ List<String> killTaskJVMArgs = buildTaskCommandArgs(context);
+ try {
+ ShellCommandExecutor shExec = buildTaskControllerExecutor(
+ TaskCommands.KILL_TASK_JVM,
+ context.env.conf.getUser(),
+ killTaskJVMArgs,
+ context.env);
+ shExec.execute();
+ LOG.debug("Command output :" +shExec.getOutput());
+ } catch (IOException ioe) {
+ LOG.warn("IOException in killing task: " + ioe.getMessage());
+ }
+ }
+
+ /**
+ * Setup appropriate permissions for directories and files that
+ * are used by the task.
+ *
+ * As the LinuxTaskController launches tasks as a user, different
+ * from the daemon, all directories and files that are potentially
+ * used by the tasks are setup with appropriate permissions that
+ * will allow access.
+ *
+ * Until secure data handling is implemented (see HADOOP-4491 and
+ * HADOOP-4493, for e.g.), the permissions are set up to allow
+ * read, write and execute access for everyone. This will be
+ * changed to restricted access as data is handled securely.
+ */
+ void initializeTask(TaskControllerContext context) {
+ // Setup permissions for the job and task cache directories.
+ setupTaskCacheFileAccess(context);
+ // setup permissions for task log directory
+ setupTaskLogFileAccess(context);
+ }
+
+ // Allows access for the task to create log files under
+ // the task log directory
+ private void setupTaskLogFileAccess(TaskControllerContext context) {
+ TaskAttemptID taskId = context.task.getTaskID();
+ File f = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.SYSLOG);
+ String taskAttemptLogDir = f.getParentFile().getAbsolutePath();
+ changeDirectoryPermissions(taskAttemptLogDir, FILE_PERMISSIONS, false);
+ }
+
+ // Allows access for the task to read, write and execute
+ // the files under the job and task cache directories
+ private void setupTaskCacheFileAccess(TaskControllerContext context) {
+ String taskId = context.task.getTaskID().toString();
+ JobID jobId = JobID.forName(getJobId(context));
+ //Change permission for the task across all the disks
+ for(String localDir : mapredLocalDirs) {
+ File f = new File(localDir);
+ File taskCacheDir = new File(f,TaskTracker.getLocalTaskDir(
+ jobId.toString(), taskId, context.task.isTaskCleanupTask()));
+ if(taskCacheDir.exists()) {
+ changeDirectoryPermissions(taskCacheDir.getPath(),
+ FILE_PERMISSIONS, true);
+ }
+ }//end of local directory Iteration
+ }
+
+ // convenience method to execute chmod.
+ private void changeDirectoryPermissions(String dir, String mode,
+ boolean isRecursive) {
+ int ret = 0;
+ try {
+ ret = FileUtil.chmod(dir, mode, isRecursive);
+ } catch (Exception e) {
+ LOG.warn("Exception in changing permissions for directory " + dir +
+ ". Exception: " + e.getMessage());
+ }
+ if (ret != 0) {
+ LOG.warn("Could not change permissions for directory " + dir);
+ }
+ }
+
+ // convenience API to create the executor for launching the
+ // setuid script.
+ private ShellCommandExecutor buildTaskControllerExecutor(TaskCommands command,
+ String userName,
+ List<String> cmdArgs, JvmEnv env)
+ throws IOException {
+ String[] taskControllerCmd = new String[3 + cmdArgs.size()];
+ taskControllerCmd[0] = taskControllerExe;
+ taskControllerCmd[1] = userName;
+ taskControllerCmd[2] = String.valueOf(command.ordinal());
+ int i = 3;
+ for (String cmdArg : cmdArgs) {
+ taskControllerCmd[i++] = cmdArg;
+ }
+ if (LOG.isDebugEnabled()) {
+ for (String cmd : taskControllerCmd) {
+ LOG.debug("taskctrl command = " + cmd);
+ }
+ }
+ ShellCommandExecutor shExec = null;
+ if(env.workDir != null && env.workDir.exists()) {
+ shExec = new ShellCommandExecutor(taskControllerCmd,
+ env.workDir, env.env);
+ } else {
+ shExec = new ShellCommandExecutor(taskControllerCmd);
+ }
+
+ return shExec;
+ }
+
+ // Return the task specific directory under the cache.
+ private String getTaskCacheDirectory(TaskControllerContext context) {
+ // In the case of JVM reuse, the task specific directory
+ // is different from what is set with respect with
+ // env.workDir. Hence building this from the taskId everytime.
+ String taskId = context.task.getTaskID().toString();
+ File cacheDirForJob = context.env.workDir.getParentFile().getParentFile();
+ if(context.task.isTaskCleanupTask()) {
+ taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
+ }
+ return new File(cacheDirForJob, taskId).getAbsolutePath();
+ }
+
+ // Write the JVM command line to a file under the specified directory
+ // Note that the JVM will be launched using a setuid executable, and
+ // could potentially contain strings defined by a user. Hence, to
+ // prevent special character attacks, we write the command line to
+ // a file and execute it.
+ private void writeCommand(String cmdLine,
+ String directory) throws IOException {
+
+ PrintWriter pw = null;
+ String commandFile = directory + File.separator + COMMAND_FILE;
+ LOG.info("Writing commands to " + commandFile);
+ try {
+ FileWriter fw = new FileWriter(commandFile);
+ BufferedWriter bw = new BufferedWriter(fw);
+ pw = new PrintWriter(bw);
+ pw.write(cmdLine);
+ } catch (IOException ioe) {
+ LOG.error("Caught IOException while writing JVM command line to file. "
+ + ioe.getMessage());
+ } finally {
+ if (pw != null) {
+ pw.close();
+ }
+ // set execute permissions for all on the file.
+ File f = new File(commandFile);
+ if (f.exists()) {
+ f.setReadable(true, false);
+ f.setExecutable(true, false);
+ }
+ }
+ }
+
+
+ /**
+ * Sets up the permissions of the following directories:
+ *
+ * Job cache directory
+ * Archive directory
+ * Hadoop log directories
+ *
+ */
+ @Override
+ void setup() {
+ //set up job cache directory and associated permissions
+ String localDirs[] = this.mapredLocalDirs;
+ for(String localDir : localDirs) {
+ //Cache root
+ File cacheDirectory = new File(localDir,TaskTracker.getCacheSubdir());
+ File jobCacheDirectory = new File(localDir,TaskTracker.getJobCacheSubdir());
+ if(!cacheDirectory.exists()) {
+ if(!cacheDirectory.mkdirs()) {
+ LOG.warn("Unable to create cache directory : " +
+ cacheDirectory.getPath());
+ }
+ }
+ if(!jobCacheDirectory.exists()) {
+ if(!jobCacheDirectory.mkdirs()) {
+ LOG.warn("Unable to create job cache directory : " +
+ jobCacheDirectory.getPath());
+ }
+ }
+ //Give world writable permission for every directory under
+ //mapred-local-dir.
+ //Child tries to write files under it when executing.
+ changeDirectoryPermissions(localDir, FILE_PERMISSIONS, true);
+ }//end of local directory manipulations
+ //setting up perms for user logs
+ File taskLog = TaskLog.getUserLogDir();
+ changeDirectoryPermissions(taskLog.getPath(), FILE_PERMISSIONS,false);
+ }
+
+ /*
+ * Create Job directories across disks and set their permissions to 777
+ * This way when tasks are run we just need to setup permissions for
+ * task folder.
+ */
+ @Override
+ void initializeJob(JobID jobid) {
+ for(String localDir : this.mapredLocalDirs) {
+ File jobDirectory = new File(localDir,
+ TaskTracker.getLocalJobDir(jobid.toString()));
+ if(!jobDirectory.exists()) {
+ if(!jobDirectory.mkdir()) {
+ LOG.warn("Unable to create job cache directory : "
+ + jobDirectory.getPath());
+ continue;
+ }
+ }
+ //Should be recursive because the jar and work folders might be
+ //present under the job cache directory
+ changeDirectoryPermissions(
+ jobDirectory.getPath(), FILE_PERMISSIONS, true);
+ }
+ }
+
+}
+
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=765713&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskController.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskController.java Thu Apr 16 18:29:35 2009
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * Controls initialization, finalization and clean up of tasks, and
+ * also the launching and killing of task JVMs.
+ *
+ * This class defines the API for initializing, finalizing and cleaning
+ * up of tasks, as also the launching and killing task JVMs.
+ * Subclasses of this class will implement the logic required for
+ * performing the actual actions.
+ */
+abstract class TaskController implements Configurable {
+
+ private Configuration conf;
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Setup task controller component.
+ *
+ */
+ abstract void setup();
+
+
+ /**
+ * Launch a task JVM
+ *
+ * This method defines how a JVM will be launched to run a task.
+ * @param context the context associated to the task
+ */
+ abstract void launchTaskJVM(TaskControllerContext context)
+ throws IOException;
+
+ /**
+ * Kill a task JVM
+ *
+ * This method defines how a JVM launched to execute one or more
+ * tasks will be killed.
+ * @param context
+ */
+ abstract void killTaskJVM(TaskControllerContext context);
+
+ /**
+ * Perform initializing actions required before a task can run.
+ *
+ * For instance, this method can be used to setup appropriate
+ * access permissions for files and directories that will be
+ * used by tasks. Tasks use the job cache, log, PID and distributed cache
+ * directories and files as part of their functioning. Typically,
+ * these files are shared between the daemon and the tasks
+ * themselves. So, a TaskController that is launching tasks
+ * as different users can implement this method to setup
+ * appropriate ownership and permissions for these directories
+ * and files.
+ */
+ abstract void initializeTask(TaskControllerContext context);
+
+
+ /**
+ * Contains task information required for the task controller.
+ */
+ static class TaskControllerContext {
+ // task being executed
+ Task task;
+ // the JVM environment for the task
+ JvmEnv env;
+ // the Shell executor executing the JVM for this task
+ ShellCommandExecutor shExec;
+ }
+
+ /**
+ * Method which is called after the job is localized so that task controllers
+ * can implement their own job localization logic.
+ *
+ * @param tip Task of job for which localization happens.
+ */
+ abstract void initializeJob(JobID jobId);
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java Thu Apr 16 18:29:35 2009
@@ -476,11 +476,36 @@
boolean useSetsid,
String pidFileName
) throws IOException {
- String stdout = FileUtil.makeShellPath(stdoutFilename);
- String stderr = FileUtil.makeShellPath(stderrFilename);
List<String> result = new ArrayList<String>(3);
result.add(bashCommand);
result.add("-c");
+ String mergedCmd = buildCommandLine(setup, cmd, stdoutFilename,
+ stderrFilename, tailLength,
+ useSetsid, pidFileName);
+ result.add(mergedCmd);
+ return result;
+ }
+
+ /**
+ * Construct the command line for running the task JVM
+ * @param setup The setup commands for the execed process.
+ * @param cmd The command and the arguments that should be run
+ * @param stdoutFilename The filename that stdout should be saved to
+ * @param stderrFilename The filename that stderr should be saved to
+ * @param tailLength The length of the tail to be saved.
+ * @param pidFileName The name of the pid-file
+ * @return the command line as a String
+ * @throws IOException
+ */
+ static String buildCommandLine(List<String> setup, List<String> cmd,
+ File stdoutFilename,
+ File stderrFilename,
+ long tailLength,
+ boolean useSetsid, String pidFileName)
+ throws IOException {
+
+ String stdout = FileUtil.makeShellPath(stdoutFilename);
+ String stderr = FileUtil.makeShellPath(stderrFilename);
StringBuffer mergedCmd = new StringBuffer();
// Spit out the pid to pidFileName
@@ -524,10 +549,9 @@
mergedCmd.append(" 2>> ");
mergedCmd.append(stderr);
}
- result.add(mergedCmd.toString());
- return result;
+ return mergedCmd.toString();
}
-
+
/**
* Add quotes to each of the command strings and
* return as a single string
@@ -594,4 +618,13 @@
return result;
}
+ /**
+ * Method to return the location of user log directory.
+ *
+ * @return base log directory
+ */
+ static File getUserLogDir() {
+ return LOG_DIR;
+ }
+
} // TaskLog
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Apr 16 18:29:35 2009
@@ -97,6 +97,7 @@
*******************************************************/
public class TaskTracker
implements MRConstants, TaskUmbilicalProtocol, Runnable {
+
static final long WAIT_FOR_DONE = 3 * 1000;
private int httpPort;
@@ -133,6 +134,8 @@
// last heartbeat response recieved
short heartbeatResponseId = -1;
+
+ static final String TASK_CLEANUP_SUFFIX = ".cleanup";
/*
* This is the last 'status' report sent by this tracker to the JobTracker.
@@ -263,7 +266,12 @@
private int probe_sample_size = 500;
private IndexCache indexCache;
-
+
+ /**
+ * Handle to the specific instance of the {@link TaskController} class
+ */
+ private TaskController taskController;
+
/*
* A list of commitTaskActions for whom commit response has been received
*/
@@ -371,7 +379,11 @@
}
}
}, "taskCleanup");
-
+
+ TaskController getTaskController() {
+ return taskController;
+ }
+
private RunningJob addTaskToJob(JobID jobId,
TaskInProgress tip) {
synchronized (runningJobs) {
@@ -431,7 +443,7 @@
boolean isCleanupAttempt) {
String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
if (isCleanupAttempt) {
- taskDir = taskDir + ".cleanup";
+ taskDir = taskDir + TASK_CLEANUP_SUFFIX;
}
return taskDir;
}
@@ -590,6 +602,15 @@
reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
mapLauncher.start();
reduceLauncher.start();
+ Class<? extends TaskController> taskControllerClass
+ = fConf.getClass("mapred.task.tracker.task-controller",
+ DefaultTaskController.class,
+ TaskController.class);
+ taskController = (TaskController)ReflectionUtils.newInstance(
+ taskControllerClass, fConf);
+
+ //setup and create jobcache directory with appropriate permissions
+ taskController.setup();
}
public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
@@ -874,6 +895,7 @@
localJobConf.getKeepFailedTaskFiles());
rjob.localized = true;
rjob.jobConf = localJobConf;
+ taskController.initializeJob(jobId);
}
}
launchTaskForJob(tip, new JobConf(rjob.jobConf));
@@ -1446,6 +1468,7 @@
synchronized(runningJobs) {
runningJobs.remove(jobId);
}
+
}