You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/04/16 20:29:36 UTC

svn commit: r765713 - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/filecache/ src/core/org/apache/hadoop/fs/ src/docs/src/documentation/content/xdocs/ src/mapred/ src/mapred/org/apache/hadoop/mapred/

Author: yhemanth
Date: Thu Apr 16 18:29:35 2009
New Revision: 765713

URL: http://svn.apache.org/viewvc?rev=765713&view=rev
Log:
HADOOP-4490. Provide ability to run tasks as job owners. Contributed by Sreekanth Ramakrishnan.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskController.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileUtil.java
    hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml
    hadoop/core/trunk/src/mapred/mapred-default.xml
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Apr 16 18:29:35 2009
@@ -237,6 +237,9 @@
     without having to restart the daemon.
     (Sreekanth Ramakrishnan and Vinod Kumar Vavilapalli via yhemanth)
 
+    HADOOP-4490. Provide ability to run tasks as job owners.
+    (Sreekanth Ramakrishnan via yhemanth)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java Thu Apr 16 18:29:35 2009
@@ -428,7 +428,8 @@
       
       // do chmod here 
       try {
-    	FileUtil.chmod(parchive.toString(), "+x");
+        //Setting recursive permission to grant everyone read and execute
+        FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
       } catch(InterruptedException e) {
     	LOG.warn("Exception in chmod" + e.toString());
       }

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileUtil.java?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileUtil.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileUtil.java Thu Apr 16 18:29:35 2009
@@ -27,6 +27,7 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.mortbay.log.Log;
 
 /**
  * A collection of file-processing util methods
@@ -706,12 +707,42 @@
    */
   public static int chmod(String filename, String perm
                           ) throws IOException, InterruptedException {
-    String cmd = "chmod " + perm + " " + filename;
-    Process p = Runtime.getRuntime().exec(cmd, null);
-    return p.waitFor();
+    return chmod(filename, perm, false);
   }
 
   /**
+   * Change the permissions on a file / directory, recursively, if
+   * needed.
+   * @param filename name of the file whose permissions are to change
+   * @param perm permission string
+   * @param recursive true, if permissions should be changed recursively
+   * @return the exit code from the command.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public static int chmod(String filename, String perm, boolean recursive)
+                            throws IOException, InterruptedException {
+    StringBuffer cmdBuf = new StringBuffer();
+    cmdBuf.append("chmod ");
+    if (recursive) {
+      cmdBuf.append("-R ");
+    }
+    cmdBuf.append(perm).append(" ");
+    cmdBuf.append(filename);
+    String[] shellCmd = {"bash", "-c" ,cmdBuf.toString()};
+    ShellCommandExecutor shExec = new ShellCommandExecutor(shellCmd);
+    try {
+      shExec.execute();
+    }catch(Exception e) {
+      if(Log.isDebugEnabled()) {
+        Log.debug("Error while changing permission : " + filename 
+            +" Exception: " + StringUtils.stringifyException(e));
+      }
+    }
+    return shExec.getExitCode();
+  }
+  
+  /**
    * Create a tmp file for a base file.
    * @param basefile the base file of the tmp
    * @param prefix file name prefix of tmp

Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml Thu Apr 16 18:29:35 2009
@@ -474,6 +474,122 @@
             </ul>
           </section>
           
+          <section>
+            <title>Task Controllers</title>
+            <p>Task controllers are classes in the Hadoop Map/Reduce 
+            framework that define how user's map and reduce tasks 
+            are launched and controlled. They can 
+            be used in clusters that require some customization in 
+            the process of launching or controlling the user tasks.
+            For example, in some 
+            clusters, there may be a requirement to run tasks as 
+            the user who submitted the job, instead of as the task 
+            tracker user, which is how tasks are launched by default.
+            This section describes how to configure and use 
+            task controllers.</p>
+            <p>The following task controllers are the available in
+            Hadoop.
+            </p>
+            <table>
+            <tr><th>Name</th><th>Class Name</th><th>Description</th></tr>
+            <tr>
+            <td>DefaultTaskController</td>
+            <td>org.apache.hadoop.mapred.DefaultTaskController</td>
+            <td> The default task controller which Hadoop uses to manage task
+            execution. The tasks run as the task tracker user.</td>
+            </tr>
+            <tr>
+            <td>LinuxTaskController</td>
+            <td>org.apache.hadoop.mapred.LinuxTaskController</td>
+            <td>This task controller, which is supported only on Linux, 
+            runs the tasks as the user who submitted the job. It requires
+            these user accounts to be created on the cluster nodes 
+            where the tasks are launched. It 
+            uses a setuid executable that is included in the Hadoop
+            distribution. The task tracker uses this executable to 
+            launch and kill tasks. The setuid executable switches to
+            the user who has submitted the job and launches or kills
+            the tasks. Currently, this task controller 
+            opens up permissions to local files and directories used 
+            by the tasks such as the job jar files, distributed archive 
+            files, intermediate files and task log files. In future,
+            it is expected that stricter file permissions are used.
+            </td>
+            </tr>
+            </table>
+            <section>
+            <title>Configuring Task Controllers</title>
+            <p>The task controller to be used can be configured by setting the
+            value of the following key in mapred-site.xml</p>
+            <table>
+            <tr>
+            <th>Property</th><th>Value</th><th>Notes</th>
+            </tr>
+            <tr>
+            <td>mapred.task.tracker.task-controller</td>
+            <td>Fully qualified class name of the task controller class</td>
+            <td>Currently there are two implementations of task controller
+            in the Hadoop system, DefaultTaskController and LinuxTaskController.
+            Refer to the class names mentioned above to determine the value
+            to set for the class of choice.
+            </td>
+            </tr>
+            </table>
+            </section>
+            <section>
+            <title>Using the LinuxTaskController</title>
+            <p>This section of the document describes the steps required to
+            use the LinuxTaskController.</p>
+            
+            <p>In order to use the LinuxTaskController, a setuid executable
+            should be built and deployed on the compute nodes. The
+            executable is named task-controller. To build the executable, 
+            execute 
+            <em>ant task-controller -Dhadoop.conf.dir=/path/to/conf/dir.
+            </em>
+            The path passed in <em>-Dhadoop.conf.dir</em> should be the path
+            on the cluster nodes where a configuration file for the setuid
+            executable would be located. The executable would be built to
+            <em>build.dir/dist.dir/bin</em> and should be installed to 
+            <em>$HADOOP_HOME/bin</em>.
+            </p>
+            
+            <p>
+            The executable must be deployed as a setuid executable, by changing
+            the ownership to <em>root</em> and giving it permissions <em>4755</em>. 
+            </p>
+            
+            <p>The executable requires a configuration file called 
+            <em>taskcontroller.cfg</em> to be
+            present in the configuration directory passed to the ant target 
+            mentioned above. If the binary was not built with a specific 
+            conf directory, the path defaults to <em>/path-to-binary/../conf</em>.
+            </p>
+            
+            <p>The executable requires following configuration items to be 
+            present in the <em>taskcontroller.cfg</em> file. The items should
+            be mentioned as simple <em>key=value</em> pairs.
+            </p>
+            <table><tr><th>Name</th><th>Description</th></tr>
+            <tr>
+            <td>mapred.local.dir</td>
+            <td>Path to mapred local directories. Should be same as the value 
+            which was provided to key in mapred-site.xml. This is required to
+            validate paths passed to the setuid executable in order to prevent
+            arbitrary paths being passed to it.</td>
+            </tr>
+            </table>
+
+            <p>
+            The LinuxTaskController requires that paths leading up to
+            the directories specified in
+            <em>mapred.local.dir</em> and <em>hadoop.log.dir</em> to be 755
+            and directories themselves having 777 permissions.
+            </p>
+            </section>
+            
+          </section>
+          
         </section>
         
         <section>

Modified: hadoop/core/trunk/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/mapred-default.xml?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/mapred-default.xml (original)
+++ hadoop/core/trunk/src/mapred/mapred-default.xml Thu Apr 16 18:29:35 2009
@@ -970,4 +970,11 @@
   </description>
 </property>
 
+<property>
+  <name>mapred.task.tracker.task-controller</name>
+  <value>org.apache.hadoop.mapred.DefaultTaskController</value>
+  <description>TaskController which is used to launch and manage task execution 
+  </description>
+</property>
+
 </configuration>

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=765713&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Thu Apr 16 18:29:35 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The default implementation for controlling tasks.
+ * 
+ * This class provides an implementation for launching and killing 
+ * tasks that need to be run as the tasktracker itself. Hence,
+ * many of the initializing or cleanup methods are not required here.
+ */
+class DefaultTaskController extends TaskController {
+
+  private static final Log LOG = 
+      LogFactory.getLog(DefaultTaskController.class);
+  /**
+   * Launch a new JVM for the task.
+   * 
+   * This method launches the new JVM for the task by executing the
+   * the JVM command using the {@link Shell.ShellCommandExecutor}
+   */
+  void launchTaskJVM(TaskController.TaskControllerContext context) 
+                                      throws IOException {
+    JvmEnv env = context.env;
+    List<String> wrappedCommand = 
+      TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
+          env.logSize, true, env.pidFile);
+    ShellCommandExecutor shexec = 
+        new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), 
+                                  env.workDir, env.env);
+    // set the ShellCommandExecutor for later use.
+    context.shExec = shexec;
+    shexec.execute();
+  }
+  
+  /**
+   * Kills the JVM running the task stored in the context.
+   * 
+   * @param context the context storing the task running within the JVM
+   * that needs to be killed.
+   */
+  void killTaskJVM(TaskController.TaskControllerContext context) {
+    ShellCommandExecutor shexec = context.shExec;
+    JvmEnv env = context.env;
+    if (shexec != null) {
+      Process process = shexec.getProcess();
+      if (process != null) {
+        if (Shell.WINDOWS) {
+          process.destroy();
+        }
+        else {
+          Path pidFilePath = new Path(env.pidFile);
+          String pid = ProcessTree.getPidFromPidFile(
+                                                pidFilePath.toString());
+          if (pid != null) {
+            long sleeptimeBeforeSigkill = env.conf.getLong(
+                 "mapred.tasktracker.tasks.sleeptime-before-sigkill",
+                 ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+
+            ProcessTree.destroy(pid, sleeptimeBeforeSigkill,
+                     ProcessTree.isSetsidAvailable, false);
+            try {
+              LOG.info("Process exited with exit code:" + process.waitFor());
+            } catch (InterruptedException ie) {}
+          }
+        }
+      }
+    }
+  }
+  
+  /**
+   * Initialize the task environment.
+   * 
+   * Since tasks are launched as the tasktracker user itself, this
+   * method has no action to perform.
+   */
+  void initializeTask(TaskController.TaskControllerContext context) {
+    // The default task controller does not need to set up
+    // any permissions for proper execution.
+    // So this is a dummy method.
+    return;
+  }
+  
+
+  @Override
+  void setup() {
+    // nothing to setup
+    return;
+  }
+
+  /*
+   * No need to do anything as we don't need to do as we dont need anything
+   * extra from what TaskTracker has done.
+   */
+  @Override
+  void initializeJob(JobID jobId) {
+  }
+  
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java Thu Apr 16 18:29:35 2009
@@ -32,6 +32,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -54,9 +55,9 @@
   
   public JvmManager(TaskTracker tracker) {
     mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), 
-        true);
+        true, tracker);
     reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
-        false);
+        false, tracker);
   }
   
   public void stop() {
@@ -124,11 +125,15 @@
     int maxJvms;
     boolean isMap;
     
+    TaskTracker tracker;
+    
     Random rand = new Random(System.currentTimeMillis());
 
-    public JvmManagerForType(int maxJvms, boolean isMap) {
+    public JvmManagerForType(int maxJvms, boolean isMap, 
+        TaskTracker tracker) {
       this.maxJvms = maxJvms;
       this.isMap = isMap;
+      this.tracker = tracker;
     }
 
     synchronized public void setRunningTaskForJvm(JVMId jvmId, 
@@ -140,7 +145,23 @@
     
     synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) {
       if (jvmToRunningTask.containsKey(jvmId)) {
-        return jvmToRunningTask.get(jvmId).getTaskInProgress();
+        //Incase of JVM reuse, tasks are returned to previously launched
+        //JVM via this method. However when a new task is launched
+        //the task being returned has to be initialized.
+        TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
+        JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
+        Task task = taskRunner.getTaskInProgress().getTask();
+        TaskControllerContext context = 
+          new TaskController.TaskControllerContext();
+        context.env = jvmRunner.env;
+        context.task = task;
+        //If we are returning the same task as which the JVM was launched
+        //we don't initialize task once again.
+        if(!jvmRunner.env.conf.get("mapred.task.id").
+            equals(task.getTaskID().toString())) {
+          tracker.getTaskController().initializeTask(context);
+        }
+        return taskRunner.getTaskInProgress();
       }
       return null;
     }
@@ -254,7 +275,7 @@
       }
       //*MUST* never reach this
       throw new RuntimeException("Inconsistent state!!! " +
-      		"JVM Manager reached an unstable state " +
+          "JVM Manager reached an unstable state " +
             "while reaping a JVM for task: " + t.getTask().getTaskID()+
             " " + getDetails());
     }
@@ -317,6 +338,9 @@
       JVMId jvmId;
       volatile boolean busy = true;
       private ShellCommandExecutor shexec; // shell terminal for running the task
+      //context used for starting JVM
+      private TaskControllerContext initalContext;
+      
       public JvmRunner(JvmEnv env, JobID jobId) {
         this.env = env;
         this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
@@ -328,18 +352,19 @@
       }
 
       public void runChild(JvmEnv env) {
+        initalContext = new TaskControllerContext();
         try {
           env.vargs.add(Integer.toString(jvmId.getId()));
-          List<String> wrappedCommand = 
-            TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
-                env.logSize, true, env.pidFile);
-          shexec = new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), 
-              env.workDir, env.env);
-          shexec.execute();
+          //Launch the task controller to run task JVM
+          initalContext.task = jvmToRunningTask.get(jvmId).getTask();
+          initalContext.env = env;
+          tracker.getTaskController().initializeTask(initalContext);
+          tracker.getTaskController().launchTaskJVM(initalContext);
         } catch (IOException ioe) {
           // do nothing
           // error and output are appropriately redirected
         } finally { // handle the exit code
+          shexec = initalContext.shExec;
           if (shexec == null) {
             return;
           }
@@ -364,29 +389,14 @@
        * of processes) is created using setsid.
        */
       public void kill() {
-        if (shexec != null) {
-          Process process = shexec.getProcess();
-          if (process != null) {
-            if (Shell.WINDOWS) {
-              process.destroy();
-            }
-            else {
-              Path pidFilePath = new Path(env.pidFile);
-              String pid = ProcessTree.getPidFromPidFile(
-                                                    pidFilePath.toString());
-              if (pid != null) {
-                long sleeptimeBeforeSigkill = env.conf.getLong(
-                     "mapred.tasktracker.tasks.sleeptime-before-sigkill",
-                     ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
-
-                ProcessTree.destroy(pid, sleeptimeBeforeSigkill,
-                         ProcessTree.isSetsidAvailable, false);
-                try {
-                  LOG.info("Process exited with exit code:" + process.waitFor());
-                } catch (InterruptedException ie) {}
-              }
-            }
-          }
+        TaskController controller = tracker.getTaskController();
+        //Check inital context before issuing a kill to prevent situations
+        //where kill is issued before task is launched.
+        if(initalContext != null && initalContext.env != null) {
+          controller.killTaskJVM(initalContext);
+        } else {
+          LOG.info(String.format("JVM Not killed %s but just removed", 
+              jvmId.toString()));
         }
         removeJvm(jvmId);
       }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=765713&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Thu Apr 16 18:29:35 2009
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * A {@link TaskController} that runs the task JVMs as the user 
+ * who submits the job.
+ * 
+ * This class executes a setuid executable to implement methods
+ * of the {@link TaskController}, including launching the task 
+ * JVM and killing it when needed, and also initializing and
+ * finalizing the task environment. 
+ * <p> The setuid executable is launched using the command line:</p>
+ * <p>task-controller user-name command command-args, where</p>
+ * <p>user-name is the name of the owner who submits the job</p>
+ * <p>command is one of the cardinal value of the 
+ * {@link LinuxTaskController.TaskCommands} enumeration</p>
+ * <p>command-args depends on the command being launched.</p>
+ * 
+ * In addition to running and killing tasks, the class also 
+ * sets up appropriate access for the directories and files 
+ * that will be used by the tasks. 
+ */
+class LinuxTaskController extends TaskController {
+
+  private static final Log LOG = 
+            LogFactory.getLog(LinuxTaskController.class);
+
+  // Name of the executable script that will contain the child
+  // JVM command line. See writeCommand for details.
+  private static final String COMMAND_FILE = "taskjvm.sh";
+  
+  // Path to the setuid executable.
+  private static String taskControllerExe;
+  
+  static {
+    // the task-controller is expected to be under the $HADOOP_HOME/bin
+    // directory.
+    File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin");
+    taskControllerExe = 
+        new File(hadoopBin, "task-controller").getAbsolutePath();
+  }
+  
+  // The list of directory paths specified in the
+  // variable mapred.local.dir. This is used to determine
+  // which among the list of directories is picked up
+  // for storing data for a particular task.
+  private String[] mapredLocalDirs;
+  
+  // permissions to set on files and directories created.
+  // When localized files are handled securely, this string
+  // will change to something more restrictive. Until then,
+  // it opens up the permissions for all, so that the tasktracker
+  // and job owners can access files together.
+  private static final String FILE_PERMISSIONS = "ugo+rwx";
+  
+  // permissions to set on components of the path leading to
+  // localized files and directories. Read and execute permissions
+  // are required for different users to be able to access the
+  // files.
+  private static final String PATH_PERMISSIONS = "go+rx";
+  
+  public LinuxTaskController() {
+    super();
+  }
+  
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    mapredLocalDirs = conf.getStrings("mapred.local.dir");
+    //Setting of the permissions of the local directory is done in 
+    //setup()
+  }
+  
+  /**
+   * List of commands that the setuid script will execute.
+   */
+  enum TaskCommands {
+    LAUNCH_TASK_JVM,
+    KILL_TASK_JVM
+  }
+  
+  /**
+   * Launch a task JVM that will run as the owner of the job.
+   * 
+   * This method launches a task JVM by executing a setuid
+   * executable that will switch to the user and run the
+   * task.
+   */
+  @Override
+  void launchTaskJVM(TaskController.TaskControllerContext context) 
+                                        throws IOException {
+    JvmEnv env = context.env;
+    // get the JVM command line.
+    String cmdLine = 
+      TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
+          env.logSize, true, env.pidFile);
+
+    // write the command to a file in the
+    // task specific cache directory
+    writeCommand(cmdLine, getTaskCacheDirectory(context));
+    
+    // Call the taskcontroller with the right parameters.
+    List<String> launchTaskJVMArgs = buildTaskCommandArgs(context);
+    ShellCommandExecutor shExec =  buildTaskControllerExecutor(
+                                    TaskCommands.LAUNCH_TASK_JVM, 
+                                    env.conf.getUser(),
+                                    launchTaskJVMArgs, env);
+    context.shExec = shExec;
+    shExec.execute();
+    LOG.debug("output after executing task jvm = " + shExec.getOutput());
+  }
+
+  // convenience API for building command arguments for specific commands
+  private List<String> buildTaskCommandArgs(TaskControllerContext context) {
+    List<String> commandArgs = new ArrayList<String>(3);
+    String taskId = context.task.getTaskID().toString();
+    String jobId = getJobId(context);
+    commandArgs.add(jobId);
+    if(!context.task.isTaskCleanupTask()) {
+      commandArgs.add(taskId);
+    }else {
+      commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
+    }
+    
+    LOG.debug("getting the task directory as: " 
+                + getTaskCacheDirectory(context));
+    commandArgs.add(getDirectoryChosenForTask(
+                              new File(getTaskCacheDirectory(context)), 
+                              context));
+    return commandArgs;
+  }
+  
+  // get the Job ID from the information in the TaskControllerContext
+  private String getJobId(TaskControllerContext context) {
+    String taskId = context.task.getTaskID().toString();
+    TaskAttemptID tId = TaskAttemptID.forName(taskId);
+    String jobId = tId.getJobID().toString();
+    return jobId;
+  }
+
+  // Get the directory from the list of directories configured
+  // in mapred.local.dir chosen for storing data pertaining to
+  // this task.
+  private String getDirectoryChosenForTask(File directory,
+                                            TaskControllerContext context) {
+    String jobId = getJobId(context);
+    String taskId = context.task.getTaskID().toString();
+    for (String dir : mapredLocalDirs) {
+      File mapredDir = new File(dir);
+      File taskDir = new File(mapredDir, TaskTracker.getLocalTaskDir(
+          jobId, taskId, context.task.isTaskCleanupTask()));
+      if (directory.equals(taskDir)) {
+        return dir;
+      }
+    }
+    
+    LOG.error("Couldn't parse task cache directory correctly");
+    throw new IllegalArgumentException("invalid task cache directory "
+                + directory.getAbsolutePath());
+  }
+  
+  /**
+   * Kill a launched task JVM running as the user of the job.
+   * 
+   * This method will launch the task controller setuid executable
+   * that in turn will kill the task JVM by sending a kill signal.
+   */
+  void killTaskJVM(TaskControllerContext context) {
+   
+    if(context.task == null) {
+      LOG.info("Context task null not killing the JVM");
+      return;
+    }
+    
+    JvmEnv env = context.env;
+    List<String> killTaskJVMArgs = buildTaskCommandArgs(context);
+    try {
+      ShellCommandExecutor shExec = buildTaskControllerExecutor(
+                                      TaskCommands.KILL_TASK_JVM,
+                                      context.env.conf.getUser(),
+                                      killTaskJVMArgs, 
+                                      context.env);
+      shExec.execute();
+      LOG.debug("Command output :" +shExec.getOutput());
+    } catch (IOException ioe) {
+      LOG.warn("IOException in killing task: " + ioe.getMessage());
+    }
+  }
+
+  /**
+   * Setup appropriate permissions for directories and files that
+   * are used by the task.
+   * 
+   * As the LinuxTaskController launches tasks as a user, different
+   * from the daemon, all directories and files that are potentially 
+   * used by the tasks are setup with appropriate permissions that
+   * will allow access.
+   * 
+   * Until secure data handling is implemented (see HADOOP-4491 and
+   * HADOOP-4493, for e.g.), the permissions are set up to allow
+   * read, write and execute access for everyone. This will be 
+   * changed to restricted access as data is handled securely.
+   */
+  void initializeTask(TaskControllerContext context) {
+    // Setup permissions for the job and task cache directories.
+    setupTaskCacheFileAccess(context);
+    // setup permissions for task log directory
+    setupTaskLogFileAccess(context);    
+  }
+  
+  // Allows access for the task to create log files under 
+  // the task log directory
+  private void setupTaskLogFileAccess(TaskControllerContext context) {
+    TaskAttemptID taskId = context.task.getTaskID();
+    File f = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.SYSLOG);
+    String taskAttemptLogDir = f.getParentFile().getAbsolutePath();
+    changeDirectoryPermissions(taskAttemptLogDir, FILE_PERMISSIONS, false);
+  }
+
+  // Allows access for the task to read, write and execute 
+  // the files under the job and task cache directories
+  private void setupTaskCacheFileAccess(TaskControllerContext context) {
+    String taskId = context.task.getTaskID().toString();
+    JobID jobId = JobID.forName(getJobId(context));
+    //Change permission for the task across all the disks
+    for(String localDir : mapredLocalDirs) {
+      File f = new File(localDir);
+      File taskCacheDir = new File(f,TaskTracker.getLocalTaskDir(
+          jobId.toString(), taskId, context.task.isTaskCleanupTask()));
+      if(taskCacheDir.exists()) {
+        changeDirectoryPermissions(taskCacheDir.getPath(), 
+            FILE_PERMISSIONS, true);
+      }          
+    }//end of local directory Iteration 
+  }
+
+  // convenience method to execute chmod.
+  private void changeDirectoryPermissions(String dir, String mode, 
+                                              boolean isRecursive) {
+    int ret = 0;
+    try {
+      ret = FileUtil.chmod(dir, mode, isRecursive);
+    } catch (Exception e) {
+      LOG.warn("Exception in changing permissions for directory " + dir + 
+                  ". Exception: " + e.getMessage());
+    }
+    if (ret != 0) {
+      LOG.warn("Could not change permissions for directory " + dir);
+    }
+  }
+  
+  // convenience API to create the executor for launching the
+  // setuid script.
+  private ShellCommandExecutor buildTaskControllerExecutor(TaskCommands command, 
+                                          String userName, 
+                                          List<String> cmdArgs, JvmEnv env) 
+                                    throws IOException {
+    String[] taskControllerCmd = new String[3 + cmdArgs.size()];
+    taskControllerCmd[0] = taskControllerExe;
+    taskControllerCmd[1] = userName;
+    taskControllerCmd[2] = String.valueOf(command.ordinal());
+    int i = 3;
+    for (String cmdArg : cmdArgs) {
+      taskControllerCmd[i++] = cmdArg;
+    }
+    if (LOG.isDebugEnabled()) {
+      for (String cmd : taskControllerCmd) {
+        LOG.debug("taskctrl command = " + cmd);
+      }
+    }
+    ShellCommandExecutor shExec = null;
+    if(env.workDir != null && env.workDir.exists()) {
+      shExec = new ShellCommandExecutor(taskControllerCmd,
+          env.workDir, env.env);
+    } else {
+      shExec = new ShellCommandExecutor(taskControllerCmd);
+    }
+    
+    return shExec;
+  }
+  
+  // Return the task specific directory under the cache.
+  private String getTaskCacheDirectory(TaskControllerContext context) {
+    // In the case of JVM reuse, the task specific directory
+    // is different from what is set with respect with
+    // env.workDir. Hence building this from the taskId everytime.
+    String taskId = context.task.getTaskID().toString();
+    File cacheDirForJob = context.env.workDir.getParentFile().getParentFile();
+    if(context.task.isTaskCleanupTask()) {
+      taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
+    }
+    return new File(cacheDirForJob, taskId).getAbsolutePath(); 
+  }
+  
+  // Write the JVM command line to a file under the specified directory
+  // Note that the JVM will be launched using a setuid executable, and
+  // could potentially contain strings defined by a user. Hence, to
+  // prevent special character attacks, we write the command line to
+  // a file and execute it.
+  private void writeCommand(String cmdLine, 
+                                      String directory) throws IOException {
+    
+    PrintWriter pw = null;
+    String commandFile = directory + File.separator + COMMAND_FILE;
+    LOG.info("Writing commands to " + commandFile);
+    try {
+      FileWriter fw = new FileWriter(commandFile);
+      BufferedWriter bw = new BufferedWriter(fw);
+      pw = new PrintWriter(bw);
+      pw.write(cmdLine);
+    } catch (IOException ioe) {
+      LOG.error("Caught IOException while writing JVM command line to file. "
+                + ioe.getMessage());
+    } finally {
+      if (pw != null) {
+        pw.close();
+      }
+      // set execute permissions for all on the file.
+      File f = new File(commandFile);
+      if (f.exists()) {
+        f.setReadable(true, false);
+        f.setExecutable(true, false);
+      }
+    }
+  }
+  
+
+  /**
+   * Sets up the permissions of the following directories:
+   * 
+   * Job cache directory
+   * Archive directory
+   * Hadoop log directories
+   * 
+   */
+  @Override
+  void setup() {
+    //set up job cache directory and associated permissions
+    String localDirs[] = this.mapredLocalDirs;
+    for(String localDir : localDirs) {
+      //Cache root
+      File cacheDirectory = new File(localDir,TaskTracker.getCacheSubdir());
+      File jobCacheDirectory = new File(localDir,TaskTracker.getJobCacheSubdir());
+      if(!cacheDirectory.exists()) {
+        if(!cacheDirectory.mkdirs()) {
+          LOG.warn("Unable to create cache directory : " + 
+              cacheDirectory.getPath());
+        }
+      }
+      if(!jobCacheDirectory.exists()) {
+        if(!jobCacheDirectory.mkdirs()) {
+          LOG.warn("Unable to create job cache directory : " + 
+              jobCacheDirectory.getPath());
+        }
+      }
+      //Give world writable permission for every directory under
+      //mapred-local-dir.
+      //Child tries to write files under it when executing.
+      changeDirectoryPermissions(localDir, FILE_PERMISSIONS, true);
+    }//end of local directory manipulations
+    //setting up perms for user logs
+    File taskLog = TaskLog.getUserLogDir();
+    changeDirectoryPermissions(taskLog.getPath(), FILE_PERMISSIONS,false);
+  }
+
+  /*
+   * Create Job directories across disks and set their permissions to 777
+   * This way when tasks are run we just need to setup permissions for
+   * task folder.
+   */
+  @Override
+  void initializeJob(JobID jobid) {
+    for(String localDir : this.mapredLocalDirs) {
+      File jobDirectory = new File(localDir, 
+          TaskTracker.getLocalJobDir(jobid.toString()));
+      if(!jobDirectory.exists()) {
+        if(!jobDirectory.mkdir()) {
+          LOG.warn("Unable to create job cache directory : " 
+              + jobDirectory.getPath());
+          continue;
+        }
+      }
+      //Should be recursive because the jar and work folders might be 
+      //present under the job cache directory
+      changeDirectoryPermissions(
+          jobDirectory.getPath(), FILE_PERMISSIONS, true);
+    }
+  }
+  
+}
+

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=765713&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskController.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskController.java Thu Apr 16 18:29:35 2009
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * Controls initialization, finalization and clean up of tasks, and
+ * also the launching and killing of task JVMs.
+ * 
+ * This class defines the API for initializing, finalizing and cleaning
+ * up of tasks, as also the launching and killing task JVMs.
+ * Subclasses of this class will implement the logic required for
+ * performing the actual actions. 
+ */
+abstract class TaskController implements Configurable {
+  
+  private Configuration conf;
+  
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+  
+  /**
+   * Setup task controller component.
+   * 
+   */
+  abstract void setup();
+  
+  
+  /**
+   * Launch a task JVM
+   * 
+   * This method defines how a JVM will be launched to run a task.
+   * @param context the context associated to the task
+   */
+  abstract void launchTaskJVM(TaskControllerContext context)
+                                      throws IOException;
+  
+  /**
+   * Kill a task JVM
+   * 
+   * This method defines how a JVM launched to execute one or more
+   * tasks will be killed.
+   * @param context
+   */
+  abstract void killTaskJVM(TaskControllerContext context);
+  
+  /**
+   * Perform initializing actions required before a task can run.
+   * 
+   * For instance, this method can be used to setup appropriate
+   * access permissions for files and directories that will be
+   * used by tasks. Tasks use the job cache, log, PID and distributed cache
+   * directories and files as part of their functioning. Typically,
+   * these files are shared between the daemon and the tasks
+   * themselves. So, a TaskController that is launching tasks
+   * as different users can implement this method to setup
+   * appropriate ownership and permissions for these directories
+   * and files.
+   */
+  abstract void initializeTask(TaskControllerContext context);
+  
+  
+  /**
+   * Contains task information required for the task controller.  
+   */
+  static class TaskControllerContext {
+    // task being executed
+    Task task; 
+    // the JVM environment for the task
+    JvmEnv env;
+    // the Shell executor executing the JVM for this task
+    ShellCommandExecutor shExec; 
+  }
+
+  /**
+   * Method which is called after the job is localized so that task controllers
+   * can implement their own job localization logic.
+   * 
+   * @param tip  Task of job for which localization happens.
+   */
+  abstract void initializeJob(JobID jobId);
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java Thu Apr 16 18:29:35 2009
@@ -476,11 +476,36 @@
                                                 boolean useSetsid,
                                                 String pidFileName
                                                ) throws IOException {
-    String stdout = FileUtil.makeShellPath(stdoutFilename);
-    String stderr = FileUtil.makeShellPath(stderrFilename);
     List<String> result = new ArrayList<String>(3);
     result.add(bashCommand);
     result.add("-c");
+    String mergedCmd = buildCommandLine(setup, cmd, stdoutFilename,
+                                                    stderrFilename, tailLength, 
+                                                    useSetsid, pidFileName);
+    result.add(mergedCmd);
+    return result;
+  }
+  
+  /**
+   * Construct the command line for running the task JVM
+   * @param setup The setup commands for the execed process.
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @param pidFileName The name of the pid-file
+   * @return the command line as a String
+   * @throws IOException
+   */
+  static String buildCommandLine(List<String> setup, List<String> cmd, 
+                                      File stdoutFilename,
+                                      File stderrFilename,
+                                      long tailLength, 
+                                      boolean useSetsid, String pidFileName)
+                                throws IOException {
+    
+    String stdout = FileUtil.makeShellPath(stdoutFilename);
+    String stderr = FileUtil.makeShellPath(stderrFilename);    
     StringBuffer mergedCmd = new StringBuffer();
     
     // Spit out the pid to pidFileName
@@ -524,10 +549,9 @@
       mergedCmd.append(" 2>> ");
       mergedCmd.append(stderr);
     }
-    result.add(mergedCmd.toString());
-    return result;
+    return mergedCmd.toString();
   }
-
+  
   /**
    * Add quotes to each of the command strings and
    * return as a single string 
@@ -594,4 +618,13 @@
     return result;
   }
   
+  /**
+   * Method to return the location of user log directory.
+   * 
+   * @return base log directory
+   */
+  static File getUserLogDir() {
+    return LOG_DIR;
+  }
+  
 } // TaskLog

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=765713&r1=765712&r2=765713&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Apr 16 18:29:35 2009
@@ -97,6 +97,7 @@
  *******************************************************/
 public class TaskTracker 
              implements MRConstants, TaskUmbilicalProtocol, Runnable {
+  
   static final long WAIT_FOR_DONE = 3 * 1000;
   private int httpPort;
 
@@ -133,6 +134,8 @@
     
   // last heartbeat response recieved
   short heartbeatResponseId = -1;
+  
+  static final String TASK_CLEANUP_SUFFIX = ".cleanup";
 
   /*
    * This is the last 'status' report sent by this tracker to the JobTracker.
@@ -263,7 +266,12 @@
   private int probe_sample_size = 500;
 
   private IndexCache indexCache;
-    
+
+  /**
+  * Handle to the specific instance of the {@link TaskController} class
+  */
+  private TaskController taskController;
+  
   /*
    * A list of commitTaskActions for whom commit response has been received 
    */
@@ -371,7 +379,11 @@
           }
         }
       }, "taskCleanup");
-    
+
+  TaskController getTaskController() {
+    return taskController;
+  }
+  
   private RunningJob addTaskToJob(JobID jobId, 
                                   TaskInProgress tip) {
     synchronized (runningJobs) {
@@ -431,7 +443,7 @@
                                 boolean isCleanupAttempt) {
 	String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
 	if (isCleanupAttempt) { 
-      taskDir = taskDir + ".cleanup";
+      taskDir = taskDir + TASK_CLEANUP_SUFFIX;
 	}
 	return taskDir;
   }
@@ -590,6 +602,15 @@
     reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
     mapLauncher.start();
     reduceLauncher.start();
+    Class<? extends TaskController> taskControllerClass 
+                          = fConf.getClass("mapred.task.tracker.task-controller",
+                                            DefaultTaskController.class, 
+                                            TaskController.class); 
+    taskController = (TaskController)ReflectionUtils.newInstance(
+                                                      taskControllerClass, fConf);
+    
+    //setup and create jobcache directory with appropriate permissions
+    taskController.setup();
   }
   
   public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
@@ -874,6 +895,7 @@
                              localJobConf.getKeepFailedTaskFiles());
         rjob.localized = true;
         rjob.jobConf = localJobConf;
+        taskController.initializeJob(jobId);
       }
     }
     launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
@@ -1446,6 +1468,7 @@
     synchronized(runningJobs) {
       runningJobs.remove(jobId);
     }
+    
   }