You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:27:25 UTC

svn commit: r1076975 [1/2] - in /hadoop/common/branches/branch-0.20-security-patches/src: core/org/apache/hadoop/util/ mapred/ mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapred/pipes/ test/org/apache/hadoop/mapred/ test/org/apache/hadoop...

Author: omalley
Date: Fri Mar  4 03:27:24 2011
New Revision: 1076975

URL: http://svn.apache.org/viewvc?rev=1076975&view=rev
Log:
commit fe1e8a78747da4e18708387d9efdbe7146e44e7b
Author: Yahoo\! <lt...@yahoo-inc.com>
Date:   Tue Aug 18 09:12:41 2009 -0700

    Apply patch for HADOOP-5488 from http://issues.apache.org/jira/secure/attachment/12414066/hadoop-5488-ydist.patch

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmContext.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java?rev=1076975&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java Fri Mar  4 03:27:24 2011
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/** 
+ * Process tree related operations
+ */
+public class ProcessTree {
+
+  private static final Log LOG = LogFactory.getLog(ProcessTree.class);
+
+  public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
+
+  public static final boolean isSetsidAvailable = isSetsidSupported();
+  private static boolean isSetsidSupported() {
+    ShellCommandExecutor shexec = null;
+    boolean setsidSupported = true;
+    try {
+      String[] args = {"setsid", "bash", "-c", "echo $$"};
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+    } catch (IOException ioe) {
+      LOG.warn("setsid is not available on this machine. So not using it.");
+      setsidSupported = false;
+    } finally { // handle the exit code
+      LOG.info("setsid exited with exit code " + shexec.getExitCode());
+      return setsidSupported;
+    }
+  }
+
+  /**
+   * Kills the process(OR process group) by sending the signal SIGKILL
+   * in the current thread
+   * @param pid Process id(OR process group id) of to-be-deleted-process
+   * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
+   * @param sleepTimeBeforeSigKill wait time before sending SIGKILL after
+   *  sending SIGTERM
+   */
+  private static void sigKillInCurrentThread(String pid, boolean isProcessGroup,
+      long sleepTimeBeforeSigKill) {
+    // Kill the subprocesses of root process(even if the root process is not
+    // alive) if process group is to be killed.
+    if (isProcessGroup || ProcessTree.isAlive(pid)) {
+      try {
+        // Sleep for some time before sending SIGKILL
+        Thread.sleep(sleepTimeBeforeSigKill);
+      } catch (InterruptedException i) {
+        LOG.warn("Thread sleep is interrupted.");
+      }
+
+      ShellCommandExecutor shexec = null;
+
+      try {
+        String pid_pgrpid;
+        if(isProcessGroup) {//kill the whole process group
+          pid_pgrpid = "-" + pid;
+        }
+        else {//kill single process
+          pid_pgrpid = pid;
+        }
+        
+        String[] args = { "kill", "-9", pid_pgrpid };
+        shexec = new ShellCommandExecutor(args);
+        shexec.execute();
+      } catch (IOException ioe) {
+        LOG.warn("Error executing shell command " + ioe);
+      } finally {
+        if(isProcessGroup) {
+          LOG.info("Killing process group" + pid + " with SIGKILL. Exit code "
+            + shexec.getExitCode());
+        }
+        else {
+          LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
+                    + shexec.getExitCode());
+        }
+      }
+    }
+  }
+
+  /** Kills the process(OR process group) by sending the signal SIGKILL
+   * @param pid Process id(OR process group id) of to-be-deleted-process
+   * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
+   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+   *                               after sending SIGTERM
+   * @param inBackground Process is to be killed in the back ground with
+   *                     a separate thread
+   */
+  private static void sigKill(String pid, boolean isProcessGroup,
+                        long sleeptimeBeforeSigkill, boolean inBackground) {
+
+    if(inBackground) { // use a separate thread for killing
+      SigKillThread sigKillThread = new SigKillThread(pid, isProcessGroup,
+                                                      sleeptimeBeforeSigkill);
+      sigKillThread.setDaemon(true);
+      sigKillThread.start();
+    }
+    else {
+      sigKillInCurrentThread(pid, isProcessGroup, sleeptimeBeforeSigkill);
+    }
+  }
+
+  /** Destroy the process.
+   * @param pid Process id of to-be-killed-process
+   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+   *                               after sending SIGTERM
+   * @param inBackground Process is to be killed in the back ground with
+   *                     a separate thread
+   */
+  protected static void destroyProcess(String pid, long sleeptimeBeforeSigkill,
+                                    boolean inBackground) {
+    ShellCommandExecutor shexec = null;
+    try {
+      String[] args = { "kill", pid };
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+    } catch (IOException ioe) {
+      LOG.warn("Error executing shell command " + ioe);
+    } finally {
+      LOG.info("Killing process " + pid +
+               " with SIGTERM. Exit code " + shexec.getExitCode());
+    }
+    
+    sigKill(pid, false, sleeptimeBeforeSigkill, inBackground);
+  }
+  
+  /** Destroy the process group.
+   * @param pgrpId Process group id of to-be-killed-processes
+   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+   *                               after sending SIGTERM
+   * @param inBackground Process group is to be killed in the back ground with
+   *                     a separate thread
+   */
+  protected static void destroyProcessGroup(String pgrpId,
+                       long sleeptimeBeforeSigkill, boolean inBackground) {
+    ShellCommandExecutor shexec = null;
+    try {
+      String[] args = { "kill", "--", "-" + pgrpId };
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+    } catch (IOException ioe) {
+      LOG.warn("Error executing shell command " + ioe);
+    } finally {
+      LOG.info("Killing all processes in the process group " + pgrpId +
+               " with SIGTERM. Exit code " + shexec.getExitCode());
+    }
+    
+    sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
+  }
+
+  /**
+   * Destroy the process-tree.
+   * @param pid process id of the root process of the subtree of processes
+   *            to be killed
+   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+   *                               after sending SIGTERM
+   * @param isProcessGroup pid is a process group leader or not
+   * @param inBackground Process is to be killed in the back ground with
+   *                     a separate thread
+   */
+  public static void destroy(String pid, long sleeptimeBeforeSigkill,
+                             boolean isProcessGroup, boolean inBackground) {
+    if(isProcessGroup) {
+      destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground);
+    }
+    else {
+      //TODO: Destroy all the processes in the subtree in this case also.
+      // For the time being, killing only the root process.
+      destroyProcess(pid, sleeptimeBeforeSigkill, inBackground);
+    }
+  }
+
+
+  /**
+   * Is the process with PID pid still alive?
+   * This method assumes that isAlive is called on a pid that was alive not
+   * too long ago, and hence assumes no chance of pid-wrapping-around.
+   */
+  public static boolean isAlive(String pid) {
+    ShellCommandExecutor shexec = null;
+    try {
+      String[] args = { "kill", "-0", pid };
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+    } catch (ExitCodeException ee) {
+      return false;
+    } catch (IOException ioe) {
+      LOG.warn("Error executing shell command "
+          + Arrays.toString(shexec.getExecString()) + ioe);
+      return false;
+    }
+    return (shexec.getExitCode() == 0 ? true : false);
+  }
+
+  /**
+   * Helper thread class that kills process-tree with SIGKILL in background
+   */
+  static class SigKillThread extends Thread {
+    private String pid = null;
+    private boolean isProcessGroup = false;
+
+    private long sleepTimeBeforeSigKill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
+
+    private SigKillThread(String pid, boolean isProcessGroup, long interval) {
+      this.pid = pid;
+      this.isProcessGroup = isProcessGroup;
+      this.setName(this.getClass().getName() + "-" + pid);
+      sleepTimeBeforeSigKill = interval;
+    }
+
+    public void run() {
+      sigKillInCurrentThread(pid, isProcessGroup, sleepTimeBeforeSigKill);
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java Fri Mar  4 03:27:24 2011
@@ -41,10 +41,10 @@ import org.apache.hadoop.util.Shell.Shel
 /**
  * A Proc file-system based ProcessTree. Works only on Linux.
  */
-public class ProcfsBasedProcessTree {
+public class ProcfsBasedProcessTree extends ProcessTree {
 
   private static final Log LOG = LogFactory
-      .getLog("org.apache.hadoop.mapred.ProcfsBasedProcessTree");
+      .getLog(ProcfsBasedProcessTree.class);
 
   private static final String PROCFS = "/proc/";
   public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
@@ -58,10 +58,21 @@ public class ProcfsBasedProcessTree {
   
   private Integer pid = -1;
 
+  private boolean setsidUsed = false;
+  
+  private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
+  
   private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
 
   public ProcfsBasedProcessTree(String pid) {
-    this(pid, PROCFS);
+    this(pid, false, DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+  }
+  
+  public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
+      long sigkillInterval) {
+    this(pid,PROCFS);
+    this.setsidUsed = setsidUsed;
+    sleeptimeBeforeSigkill = sigkillInterval; 
   }
 
   public ProcfsBasedProcessTree(String pid, String procfsDir) {
@@ -69,6 +80,13 @@ public class ProcfsBasedProcessTree {
     this.procfsDir = procfsDir;
   }
   
+  /**
+   * Sets SIGKILL interval
+   * @deprecated Use {@link ProcfsBasedProcessTree#ProcfsBasedProcessTree(
+   *                  String, boolean, long)} instead
+   * @param interval The time to wait before sending SIGKILL
+   *                 after sending SIGTERM
+   */
   public void setSigKillInterval(long interval) {
     sleepTimeBeforeSigKill = interval;
   }
@@ -170,47 +188,107 @@ public class ProcfsBasedProcessTree {
   }
 
   /**
-   * Is the process-tree alive? Currently we care only about the status of the
-   * root-process.
+   * Is the root-process alive?
    * 
-   * @return true if the process-true is alive, false otherwise.
+   * @return true if the root-process is alive, false otherwise.
    */
   public boolean isAlive() {
     if (pid == -1) {
       return false;
     } else {
-      return this.isAlive(pid);
+      return isAlive(pid.toString());
+    }
+  }
+
+  /**
+   * Is any of the subprocesses in the process-tree alive?
+   * 
+   * @return true if any of the processes in the process-tree is
+   *           alive, false otherwise.
+   */
+  public boolean isAnyProcessInTreeAlive() {
+    for (Integer pId : processTree.keySet()) {
+      if (isAlive(pId.toString())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** Verify that the given process id is same as its process group id.
+   * @param pidStr Process id of the to-be-verified-process
+   */
+  private static boolean assertPidPgrpidForMatch(String pidStr) {
+    Integer pId = Integer.parseInt(pidStr);
+    // Get information for this process
+    ProcessInfo pInfo = new ProcessInfo(pId);
+    pInfo = constructProcessInfo(pInfo);
+    //make sure that pId and its pgrpId match
+    if (!pInfo.getPgrpId().equals(pId)) {
+      LOG.warn("Unexpected: Process with PID " + pId +
+               " is not a process group leader.");
+      return false;
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(pId + " is a process group leader, as expected.");
+    }
+    return true;
+  }
+
+  /** Make sure that the given pid is a process group leader and then
+   * destroy the process group.
+   * @param pgrpId   Process group id of to-be-killed-processes
+   * @param interval The time to wait before sending SIGKILL
+   *                 after sending SIGTERM
+   * @param inBackground Process is to be killed in the back ground with
+   *                     a separate thread
+   */
+  public static void assertAndDestroyProcessGroup(String pgrpId, long interval,
+                       boolean inBackground)
+         throws IOException {
+    // Make sure that the pid given is a process group leader
+    if (!assertPidPgrpidForMatch(pgrpId)) {
+      throw new IOException("Process with PID " + pgrpId  +
+                          " is not a process group leader.");
+    }
+    destroyProcessGroup(pgrpId, interval, inBackground);
   }
 
   /**
-   * Destroy the process-tree. Currently we only make sure the root process is
-   * gone. It is the responsibility of the root process to make sure that all
-   * its descendants are cleaned up.
+   * Destroy the process-tree.
    */
   public void destroy() {
+    destroy(true);
+  }
+  
+  /**
+   * Destroy the process-tree.
+   * @param inBackground Process is to be killed in the back ground with
+   *                     a separate thread
+   */
+  public void destroy(boolean inBackground) {
     LOG.debug("Killing ProcfsBasedProcessTree of " + pid);
     if (pid == -1) {
       return;
     }
-    ShellCommandExecutor shexec = null;
 
-    if (isAlive(this.pid)) {
-      try {
-        String[] args = { "kill", this.pid.toString() };
-        shexec = new ShellCommandExecutor(args);
-        shexec.execute();
-      } catch (IOException ioe) {
-        LOG.warn("Error executing shell command " + ioe);
-      } finally {
-        LOG.info("Killing " + pid + " with SIGTERM. Exit code "
-            + shexec.getExitCode());
+    if (isAlive(pid.toString())) {
+      if (isSetsidAvailable && setsidUsed) {
+        // In this case, we know that pid got created using setsid. So kill the
+        // whole processGroup.
+        try {
+          assertAndDestroyProcessGroup(pid.toString(), sleeptimeBeforeSigkill,
+                              inBackground);
+        } catch (IOException e) {
+          LOG.warn(StringUtils.stringifyException(e));
+        }
+      }
+      else {
+        //TODO: Destroy all the processes in the subtree in this case also.
+        // For the time being, killing only the root process.
+        destroyProcess(pid.toString(), sleeptimeBeforeSigkill, inBackground);
       }
     }
-
-    SigKillThread sigKillThread = new SigKillThread();
-    sigKillThread.setDaemon(true);
-    sigKillThread.start();
   }
 
   /**
@@ -243,52 +321,7 @@ public class ProcfsBasedProcessTree {
     return total;
   }
 
-  /**
-   * Get PID from a pid-file.
-   * 
-   * @param pidFileName
-   *          Name of the pid-file.
-   * @return the PID string read from the pid-file. Returns null if the
-   *         pidFileName points to a non-existing file or if read fails from the
-   *         file.
-   */
-  public static String getPidFromPidFile(String pidFileName) {
-    BufferedReader pidFile = null;
-    FileReader fReader = null;
-    String pid = null;
-
-    try {
-      fReader = new FileReader(pidFileName);
-      pidFile = new BufferedReader(fReader);
-    } catch (FileNotFoundException f) {
-      LOG.debug("PidFile doesn't exist : " + pidFileName);
-      return pid;
-    }
-
-    try {
-      pid = pidFile.readLine();
-    } catch (IOException i) {
-      LOG.error("Failed to read from " + pidFileName);
-    } finally {
-      try {
-        if (fReader != null) {
-          fReader.close();
-        }
-        try {
-          if (pidFile != null) {
-            pidFile.close();
-          }
-        } catch (IOException i) {
-          LOG.warn("Error closing the stream " + pidFile);
-        }
-      } catch (IOException i) {
-        LOG.warn("Error closing the stream " + fReader);
-      }
-    }
-    return pid;
-  }
-
-  private Integer getValidPID(String pid) {
+  private static Integer getValidPID(String pid) {
     Integer retPid = -1;
     try {
       retPid = Integer.parseInt((String) pid);
@@ -328,7 +361,7 @@ public class ProcfsBasedProcessTree {
    * Construct the ProcessInfo using the process' PID and procfs and return the
    * same. Returns null on failing to read from procfs,
    */
-  private ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
+  private static ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
     return constructProcessInfo(pinfo, PROCFS);
   }
 
@@ -343,7 +376,7 @@ public class ProcfsBasedProcessTree {
    * @param procfsDir root of the proc file system
    * @return updated ProcessInfo, null on errors.
    */
-  private ProcessInfo constructProcessInfo(ProcessInfo pinfo, 
+  private static ProcessInfo constructProcessInfo(ProcessInfo pinfo, 
                                                     String procfsDir) {
     ProcessInfo ret = null;
     // Read "procfsDir/<pid>/stat" file

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar  4 03:27:24 2011
@@ -198,12 +198,10 @@
 </property>
 
 <property>
-  <name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name>
+  <name>mapred.tasktracker.tasks.sleeptime-before-sigkill</name>
   <value>5000</value>
   <description>The time, in milliseconds, the tasktracker waits for sending a
-  SIGKILL to a process that has overrun memory limits, after it has been sent
-  a SIGTERM. Used only if tasks' memory management is enabled via
-  mapred.tasktracker.tasks.maxmemory.</description>
+  SIGKILL to a process, after it has been sent a SIGTERM.</description>
 </property>
 
 <property>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar  4 03:27:24 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.metrics.Metrics
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
 import org.apache.log4j.LogManager;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 
 /** 
@@ -45,7 +46,7 @@ import org.apache.hadoop.util.StringUtil
 class Child {
 
   public static final Log LOG =
-    LogFactory.getLog(TaskTracker.class);
+    LogFactory.getLog(Child.class);
 
   static volatile TaskAttemptID taskid = null;
   static volatile boolean isCleanup;
@@ -99,18 +100,18 @@ class Child {
     t.setName("Thread for syncLogs");
     t.setDaemon(true);
     t.start();
-    //for the memory management, a PID file is written and the PID file
-    //is written once per JVM. We simply symlink the file on a per task
-    //basis later (see below). Long term, we should change the Memory
-    //manager to use JVMId instead of TaskAttemptId
-    Path srcPidPath = null;
-    Path dstPidPath = null;
+    
+    String pid = "";
+    if (!Shell.WINDOWS) {
+      pid = System.getenv().get("JVM_PID");
+    }
+    JvmContext context = new JvmContext(jvmId, pid);
     int idleLoopCount = 0;
     Task task = null;
     try {
       while (true) {
         taskid = null;
-        JvmTask myTask = umbilical.getTask(jvmId);
+        JvmTask myTask = umbilical.getTask(context);
         if (myTask.shouldDie()) {
           break;
         } else {
@@ -137,18 +138,6 @@ class Child {
         //are viewable immediately
         TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
         JobConf job = new JobConf(task.getJobFile());
-        if (job.getBoolean("task.memory.mgmt.enabled", false)) {
-          if (srcPidPath == null) {
-            srcPidPath = new Path(task.getPidFile());
-          }
-          //since the JVM is running multiple tasks potentially, we need
-          //to do symlink stuff only for the subsequent tasks
-          if (!taskid.equals(firstTaskid)) {
-            dstPidPath = new Path(task.getPidFile());
-            FileUtil.symLink(srcPidPath.toUri().getPath(), 
-                dstPidPath.toUri().getPath());
-          }
-        }
         //setupWorkDir actually sets up the symlinks for the distributed
         //cache. After a task exits we wipe the workdir clean, and hence
         //the symlinks have to be rebuilt.
@@ -170,11 +159,6 @@ class Child {
           task.run(job, umbilical);             // run the task
         } finally {
           TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
-          if (!taskid.equals(firstTaskid) && 
-              job.getBoolean("task.memory.mgmt.enabled", false)) {
-            // delete the pid-file's symlink
-            new File(dstPidPath.toUri().getPath()).delete();
-          }
         }
         if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
           break;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Mar  4 03:27:24 2011
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.util.List;
 
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
@@ -51,7 +53,7 @@ class DefaultTaskController extends Task
     JvmEnv env = context.env;
     List<String> wrappedCommand = 
       TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
-          env.logSize, env.pidFile);
+          env.logSize, true);
     ShellCommandExecutor shexec = 
         new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), 
                                   env.workDir, env.env);
@@ -68,13 +70,34 @@ class DefaultTaskController extends Task
    */
   void killTaskJVM(TaskController.TaskControllerContext context) {
     ShellCommandExecutor shexec = context.shExec;
+
     if (shexec != null) {
       Process process = shexec.getProcess();
-      if (process != null) {
-        process.destroy();
+      if (Shell.WINDOWS) {
+        // Currently we don't use setsid on WINDOWS. So kill the process alone.
+        if (process != null) {
+          process.destroy();
+        }
+      }
+      else { // In addition to the task JVM, kill its subprocesses also.
+        String pid = context.pid;
+        if (pid != null) {
+          ProcessTree.destroy(pid, context.sleeptimeBeforeSigkill,
+              ProcessTree.isSetsidAvailable, false);
+          try {
+            if (process != null) {
+              LOG.info("Process exited with exit code:" + process.waitFor());
+            }
+          } catch (InterruptedException ie) {}
+        }
+        else if (process != null) {
+          // kill the task JVM alone, if we don't have the process group id
+          process.destroy();
+        }
       }
     }
   }
+
   
   /**
    * Initialize the task environment.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Fri Mar  4 03:27:24 2011
@@ -63,7 +63,7 @@ public class IsolationRunner {
       LOG.info("Task " + taskId + " reporting fatal error: " + msg);
     }
 
-    public JvmTask getTask(JVMId jvmId) throws IOException {
+    public JvmTask getTask(JvmContext context) throws IOException {
       return null;
     }
 

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmContext.java?rev=1076975&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmContext.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmContext.java Fri Mar  4 03:27:24 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+class JvmContext implements Writable {
+
+  public static final Log LOG =
+    LogFactory.getLog(JvmContext.class);
+  
+  JVMId jvmId;
+  String pid;
+  
+  JvmContext() {
+    jvmId = new JVMId();
+    pid = "";
+  }
+  
+  JvmContext(JVMId id, String pid) {
+    jvmId = id;
+    this.pid = pid;
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    jvmId.readFields(in);
+    this.pid = Text.readString(in);
+  }
+  
+  public void write(DataOutput out) throws IOException {
+    jvmId.write(out);
+    Text.writeString(out, pid);
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Mar  4 03:27:24 2011
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 class JvmManager {
@@ -46,8 +47,8 @@ class JvmManager {
   
   public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
       File stdout,File stderr,long logSize, File workDir, 
-      Map<String,String> env, String pidFile, JobConf conf) {
-    return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,pidFile,conf);
+      Map<String,String> env, JobConf conf) {
+    return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,conf);
   }
   
   public JvmManager(TaskTracker tracker) {
@@ -57,6 +58,39 @@ class JvmManager {
         false, tracker);
   }
   
+  /*
+   * Saves pid of the given taskJvm
+   */
+  void setPidToJvm(JVMId jvmId, String pid) {
+    if (jvmId.isMapJVM()) {
+      mapJvmManager.jvmIdToPid.put(jvmId, pid);
+    }
+    else {
+      reduceJvmManager.jvmIdToPid.put(jvmId, pid);
+    }
+  }
+  
+  /*
+   * Returns the pid of the task
+   */
+  String getPid(TaskRunner t) {
+    if (t != null && t.getTask() != null) {
+      if (t.getTask().isMapTask()) {
+        JVMId id = mapJvmManager.runningTaskToJvm.get(t);
+        if (id != null) {
+          return mapJvmManager.jvmIdToPid.get(id);
+        }
+      } else {
+        JVMId id = reduceJvmManager.runningTaskToJvm.get(t);
+        if (id != null) {
+          return reduceJvmManager.jvmIdToPid.get(id);
+        }
+      }
+    }
+    return null;
+  }
+  
+  
   public void stop() {
     mapJvmManager.stop();
     reduceJvmManager.stop();
@@ -119,6 +153,10 @@ class JvmManager {
     //Mapping from the JVM IDs to Reduce JVM processes
     Map <JVMId, JvmRunner> jvmIdToRunner = 
       new HashMap<JVMId, JvmRunner>();
+    //Mapping from the JVM IDs to process IDs
+    Map <JVMId, String> jvmIdToPid = 
+      new HashMap<JVMId, String>();
+    
     int maxJvms;
     boolean isMap;
     
@@ -208,6 +246,7 @@ class JvmManager {
     
     synchronized private void removeJvm(JVMId jvmId) {
       jvmIdToRunner.remove(jvmId);
+      jvmIdToPid.remove(jvmId);
     }
     private synchronized void reapJvm( 
         TaskRunner t, JvmEnv env) {
@@ -314,13 +353,13 @@ class JvmManager {
       jvmRunner.start();
     }
     synchronized private void updateOnJvmExit(JVMId jvmId, 
-        int exitCode, boolean killed) {
+        int exitCode) {
       removeJvm(jvmId);
       TaskRunner t = jvmToRunningTask.remove(jvmId);
 
       if (t != null) {
         runningTaskToJvm.remove(t);
-        if (!killed && exitCode != 0) {
+        if (exitCode != 0) {
           t.setExitCode(exitCode);
         }
         t.signalDone();
@@ -364,8 +403,11 @@ class JvmManager {
           if (shexec == null) {
             return;
           }
+          
+          kill();
+          
           int exitCode = shexec.getExitCode();
-          updateOnJvmExit(jvmId, exitCode, killed);
+          updateOnJvmExit(jvmId, exitCode);
           LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " + 
               numTasksRan);
           try {
@@ -381,16 +423,23 @@ class JvmManager {
       }
 
       public void kill() {
-        TaskController controller = tracker.getTaskController();
-        //Check inital context before issuing a kill to prevent situations
-        //where kill is issued before task is launched.
-        if(initalContext != null && initalContext.env != null) {
-          controller.killTaskJVM(initalContext);
-        } else {
-          LOG.info(String.format("JVM Not killed %s but just removed", 
-              jvmId.toString()));
+        if (!killed) {
+          killed = true;
+          TaskController controller = tracker.getTaskController();
+          // Check inital context before issuing a kill to prevent situations
+          // where kill is issued before task is launched.
+          if (initalContext != null && initalContext.env != null) {
+            initalContext.pid = jvmIdToPid.get(jvmId);
+            initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
+              .getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill",
+                  ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+            controller.killTaskJVM(initalContext);
+          } else {
+            LOG.info(String.format("JVM Not killed %s but just removed", jvmId
+                .toString()));
+          }
+          removeJvm(jvmId);
         }
-        removeJvm(jvmId);
       }
       
       public void taskRan() {
@@ -415,21 +464,19 @@ class JvmManager {
     File stdout;
     File stderr;
     File workDir;
-    String pidFile;
     long logSize;
     JobConf conf;
     Map<String, String> env;
 
     public JvmEnv(List<String> setup, Vector<String> vargs, File stdout, 
         File stderr, long logSize, File workDir, Map<String,String> env,
-        String pidFile, JobConf conf) {
+        JobConf conf) {
       this.setup = setup;
       this.vargs = vargs;
       this.stdout = stdout;
       this.stderr = stderr;
       this.workDir = workDir;
       this.env = env;
-      this.pidFile = pidFile;
       this.conf = conf;
     }
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar  4 03:27:24 2011
@@ -126,7 +126,7 @@ class LinuxTaskController extends TaskCo
     // get the JVM command line.
     String cmdLine = 
       TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
-          env.logSize, env.pidFile);
+          env.logSize, true);
     StringBuffer sb = new StringBuffer();
     //export out all the environment variable before child command.
     for(Entry<String, String> entry : env.env.entrySet()) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar  4 03:27:24 2011
@@ -270,7 +270,7 @@ class LocalJobRunner implements JobSubmi
 
     // TaskUmbilicalProtocol methods
 
-    public JvmTask getTask(JVMId jvmId) { return null; }
+    public JvmTask getTask(JvmContext context) { return null; }
 
     public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
     throws IOException, InterruptedException {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar  4 03:27:24 2011
@@ -44,7 +44,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
@@ -59,7 +58,7 @@ import org.apache.hadoop.util.StringUtil
  */
 abstract public class Task implements Writable, Configurable {
   private static final Log LOG =
-    LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner");
+    LogFactory.getLog(Task.class);
 
   // Counters used by Task subclasses
   protected static enum Counter { 
@@ -187,12 +186,7 @@ abstract public class Task implements Wr
   }
 
   Counters getCounters() { return counters; }
-  public void setPidFile(String pidFile) { 
-    this.pidFile = pidFile; 
-  }
-  public String getPidFile() { 
-    return pidFile; 
-  }
+
   
   /**
    * Get the job name for this task.
@@ -351,7 +345,6 @@ abstract public class Task implements Wr
     Text.writeString(out, username);
     out.writeBoolean(writeSkipRecs);
     out.writeBoolean(taskCleanup);  
-    Text.writeString(out, pidFile);
   }
   
   public void readFields(DataInput in) throws IOException {
@@ -373,7 +366,6 @@ abstract public class Task implements Wr
     if (taskCleanup) {
       setPhase(TaskStatus.Phase.CLEANUP);
     }
-    pidFile = Text.readString(in);
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Mar  4 03:27:24 2011
@@ -22,9 +22,7 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 /**
@@ -99,6 +97,10 @@ abstract class TaskController implements
     JvmEnv env;
     // the Shell executor executing the JVM for this task
     ShellCommandExecutor shExec; 
+    // process handle of task JVM
+    String pid;
+    // waiting time before sending SIGKILL to task JVM after sending SIGTERM
+    long sleeptimeBeforeSigkill;
   }
 
   /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Mar  4 03:27:24 2011
@@ -39,6 +39,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.util.Shell;
 import org.apache.log4j.Appender;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -50,7 +52,7 @@ import org.apache.log4j.Logger;
  */
 public class TaskLog {
   private static final Log LOG =
-    LogFactory.getLog(TaskLog.class.getName());
+    LogFactory.getLog(TaskLog.class);
 
   private static final File LOG_DIR = 
     new File(System.getProperty("hadoop.log.dir"), 
@@ -403,7 +405,7 @@ public class TaskLog {
                                                 long tailLength
                                                ) throws IOException {
     return captureOutAndError(null, cmd, stdoutFilename,
-                              stderrFilename, tailLength, null );
+                              stderrFilename, tailLength, false);
   }
 
   /**
@@ -425,7 +427,7 @@ public class TaskLog {
                                                 long tailLength
                                                ) throws IOException {
     return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
-        tailLength, null);
+        tailLength, false);
   }
 
   /**
@@ -438,9 +440,11 @@ public class TaskLog {
    * @param stdoutFilename The filename that stdout should be saved to
    * @param stderrFilename The filename that stderr should be saved to
    * @param tailLength The length of the tail to be saved.
-   * @param pidFileName The name of the pid-file
+   * @deprecated pidFiles are no more used. Instead pid is exported to
+   *             env variable JVM_PID.
    * @return the modified command that should be run
    */
+  @Deprecated
   public static List<String> captureOutAndError(List<String> setup,
                                                 List<String> cmd, 
                                                 File stdoutFilename,
@@ -448,15 +452,68 @@ public class TaskLog {
                                                 long tailLength,
                                                 String pidFileName
                                                ) throws IOException {
+    return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
+        tailLength, false, pidFileName);
+  }
+  
+  /**
+   * Wrap a command in a shell to capture stdout and stderr to files.
+   * Setup commands such as setting memory limit can be passed which 
+   * will be executed before exec.
+   * If the tailLength is 0, the entire output will be saved.
+   * @param setup The setup commands for the execed process.
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @param useSetsid Should setsid be used in the command or not.
+   * @deprecated pidFiles are no more used. Instead pid is exported to
+   *             env variable JVM_PID.
+   * @return the modified command that should be run
+   * 
+   */
+  @Deprecated
+  public static List<String> captureOutAndError(List<String> setup,
+      List<String> cmd, 
+      File stdoutFilename,
+      File stderrFilename,
+      long tailLength,
+      boolean useSetsid,
+      String pidFileName
+     ) throws IOException {
+    return captureOutAndError(setup,cmd, stdoutFilename, stderrFilename, tailLength,
+        useSetsid);
+  }
+
+  /**
+   * Wrap a command in a shell to capture stdout and stderr to files.
+   * Setup commands such as setting memory limit can be passed which 
+   * will be executed before exec.
+   * If the tailLength is 0, the entire output will be saved.
+   * @param setup The setup commands for the execed process.
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @param useSetsid Should setsid be used in the command or not.
+   * @return the modified command that should be run
+   */
+  public static List<String> captureOutAndError(List<String> setup,
+      List<String> cmd, 
+      File stdoutFilename,
+      File stderrFilename,
+      long tailLength,
+      boolean useSetsid
+     ) throws IOException {
     List<String> result = new ArrayList<String>(3);
     result.add(bashCommand);
     result.add("-c");
     String mergedCmd = buildCommandLine(setup, cmd,
         stdoutFilename,
         stderrFilename, tailLength,
-        pidFileName);
+        useSetsid);
     result.add(mergedCmd.toString());
-    return result;
+    return result; 
   }
   
   
@@ -464,17 +521,15 @@ public class TaskLog {
       List<String> cmd, 
       File stdoutFilename,
       File stderrFilename,
-      long tailLength, 
-      String pidFileName) throws IOException {
+      long tailLength,
+      boolean useSetSid) throws IOException {
     
     String stdout = FileUtil.makeShellPath(stdoutFilename);
     String stderr = FileUtil.makeShellPath(stderrFilename);
     StringBuffer mergedCmd = new StringBuffer();
-    // Spit out the pid to pidFileName
-    if (pidFileName != null) {
-      mergedCmd.append("echo $$ > ");
-      mergedCmd.append(pidFileName);
-      mergedCmd.append(" ;");
+    
+    if (!Shell.WINDOWS) {
+      mergedCmd.append(" export JVM_PID=`echo $$` ; ");
     }
 
     if (setup != null && setup.size() > 0) {
@@ -483,6 +538,9 @@ public class TaskLog {
     }
     if (tailLength > 0) {
       mergedCmd.append("(");
+    } else if (ProcessTree.isSetsidAvailable && useSetSid 
+        && !Shell.WINDOWS) {
+      mergedCmd.append("exec setsid ");
     } else {
       mergedCmd.append("exec ");
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Fri Mar  4 03:27:24 2011
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.util.StringUtils;
 
@@ -42,7 +43,6 @@ class TaskMemoryManagerThread extends Th
 
   private TaskTracker taskTracker;
   private long monitoringInterval;
-  private long sleepTimeBeforeSigKill;
 
   private long maxMemoryAllowedForAllTasks;
 
@@ -55,10 +55,7 @@ class TaskMemoryManagerThread extends Th
     this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
       taskTracker.getJobConf().getLong(
         "mapred.tasktracker.taskmemorymanager.monitoring-interval", 
-        5000L),
-      taskTracker.getJobConf().getLong(
-        "mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill",
-        ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL));
+        5000L));
 
     this.taskTracker = taskTracker;
   }
@@ -66,8 +63,7 @@ class TaskMemoryManagerThread extends Th
   // mainly for test purposes. note that the tasktracker variable is
   // not set here.
   TaskMemoryManagerThread(long maxMemoryAllowedForAllTasks,
-                            long monitoringInterval,
-                            long sleepTimeBeforeSigKill) {
+                            long monitoringInterval) {
     setName(this.getClass().getName());
 
     processTreeInfoMap = new HashMap<TaskAttemptID, ProcessTreeInfo>();
@@ -77,15 +73,12 @@ class TaskMemoryManagerThread extends Th
     this.maxMemoryAllowedForAllTasks = maxMemoryAllowedForAllTasks;
 
     this.monitoringInterval = monitoringInterval;
-    
-    this.sleepTimeBeforeSigKill = sleepTimeBeforeSigKill;
   }
 
-  public void addTask(TaskAttemptID tid, long memLimit, String pidFile) {
+  public void addTask(TaskAttemptID tid, long memLimit) {
     synchronized (tasksToBeAdded) {
       LOG.debug("Tracking ProcessTree " + tid + " for the first time");
-      ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit,
-          sleepTimeBeforeSigKill, pidFile);
+      ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit);
       tasksToBeAdded.put(tid, ptInfo);
     }
   }
@@ -104,16 +97,11 @@ class TaskMemoryManagerThread extends Th
     private String pidFile;
 
     public ProcessTreeInfo(TaskAttemptID tid, String pid,
-        ProcfsBasedProcessTree pTree, long memLimit, 
-        long sleepTimeBeforeSigKill, String pidFile) {
+        ProcfsBasedProcessTree pTree, long memLimit) {
       this.tid = tid;
       this.pid = pid;
       this.pTree = pTree;
-      if (this.pTree != null) {
-        this.pTree.setSigKillInterval(sleepTimeBeforeSigKill);
-      }
       this.memLimit = memLimit;
-      this.pidFile = pidFile;
     }
 
     public TaskAttemptID getTID() {
@@ -184,8 +172,8 @@ class TaskMemoryManagerThread extends Th
 
           // Initialize any uninitialized processTrees
           if (pId == null) {
-            // get pid from pid-file
-            pId = getPid(ptInfo.pidFile);
+            // get pid from taskAttemptId
+            pId = taskTracker.getPid(ptInfo.getTID());
             if (pId != null) {
               // PID will be null, either if the pid file is yet to be created
               // or if the tip is finished and we removed pidFile, but the TIP
@@ -193,7 +181,12 @@ class TaskMemoryManagerThread extends Th
               // transmission to JT
 
               // create process tree object
-              ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(pId);
+              long sleeptimeBeforeSigkill = taskTracker.getJobConf().getLong(
+                  "mapred.tasktracker.tasks.sleeptime-before-sigkill",
+                  ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+              
+              ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(
+                  pId,ProcessTree.isSetsidAvailable, sleeptimeBeforeSigkill);
               LOG.debug("Tracking ProcessTree " + pId + " for the first time");
 
               ptInfo.setPid(pId);
@@ -233,7 +226,7 @@ class TaskMemoryManagerThread extends Th
             taskTracker.cleanUpOverMemoryTask(tid, true, msg);
 
             // Now destroy the ProcessTree, remove it from monitoring map.
-            pTree.destroy();
+            pTree.destroy(true/*in the background*/);
             it.remove();
             LOG.info("Removed ProcessTree with root " + pId);
           } else {
@@ -368,7 +361,7 @@ class TaskMemoryManagerThread extends Th
         // Now destroy the ProcessTree, remove it from monitoring map.
         ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
         ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
-        pTree.destroy();
+        pTree.destroy(true/*in the background*/);
         processTreeInfoMap.remove(tid);
         LOG.info("Removed ProcessTree with root " + ptInfo.getPID());
       }
@@ -377,17 +370,4 @@ class TaskMemoryManagerThread extends Th
           + "But found no alive task to kill for freeing memory.");
     }
   }
-
-  /**
-   * Load pid of the task from the pidFile.
-   * 
-   * @param pidFileName
-   * @return the pid of the task process.
-   */
-  private String getPid(String pidFileName) {
-    if ((new File(pidFileName)).exists()) {
-      return ProcfsBasedProcessTree.getPidFromPidFile(pidFileName);
-     }
-     return null;
-  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar  4 03:27:24 2011
@@ -368,13 +368,8 @@ abstract class TaskRunner extends Thread
       vargs.add(address.getAddress().getHostAddress()); 
       vargs.add(Integer.toString(address.getPort())); 
       vargs.add(taskid.toString());                      // pass task identifier
-
-      String pidFile = lDirAlloc.getLocalPathForWrite(
-            (TaskTracker.getPidFile(t.getJobID().toString(), 
-             taskid.toString(), t.isTaskCleanupTask())),
-            this.conf).toString();
-      t.setPidFile(pidFile);
-      tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf, pidFile);
+      
+      tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
 
       // set memory limit using ulimit if feasible and necessary ...
       String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
@@ -443,7 +438,7 @@ abstract class TaskRunner extends Thread
 
       jvmManager.launchJvm(this, 
           jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, 
-              workDir, env, pidFile, conf));
+              workDir, env, conf));
       synchronized (lock) {
         while (!done) {
           lock.wait();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 03:27:24 2011
@@ -199,7 +199,6 @@ public class TaskTracker 
   private static final String SUBDIR = "taskTracker";
   private static final String CACHEDIR = "archive";
   private static final String JOBCACHE = "jobcache";
-  private static final String PID = "pid";
   private static final String OUTPUT = "output";
   private JobConf originalConf;
   private JobConf fConf;
@@ -419,13 +418,14 @@ public class TaskTracker 
 	return taskDir;
   }
 
-  static String getPidFile(String jobid, 
-                           String taskid, 
-                           boolean isCleanup) {
-    return  getLocalTaskDir(jobid, taskid, isCleanup)
-            + Path.SEPARATOR + PID;
+  String getPid(TaskAttemptID tid) {
+    TaskInProgress tip = tasks.get(tid);
+    if (tip != null) {
+      return jvmManager.getPid(tip.getTaskRunner());
+    }
+    return null;
   }
-
+  
   public long getProtocolVersion(String protocol, 
                                  long clientVersion) throws IOException {
     if (protocol.equals(TaskUmbilicalProtocol.class.getName())) {
@@ -760,7 +760,7 @@ public class TaskTracker 
     }
   }
 
-  private LocalDirAllocator lDirAlloc = 
+  private static LocalDirAllocator lDirAlloc = 
                               new LocalDirAllocator("mapred.local.dir");
 
   // intialize the job directory
@@ -1742,13 +1742,12 @@ public class TaskTracker 
   }
   
   void addToMemoryManager(TaskAttemptID attemptId, boolean isMap,
-                          JobConf conf, 
-                          String pidFile) {
+                          JobConf conf) {
     if (isTaskMemoryManagerEnabled()) {
       taskMemoryManager.addTask(attemptId, 
           isMap ? conf
               .getMemoryForMapTask() * 1024 * 1024L : conf
-              .getMemoryForReduceTask() * 1024 * 1024L, pidFile);
+              .getMemoryForReduceTask() * 1024 * 1024L);
     }
   }
 
@@ -2534,9 +2533,12 @@ public class TaskTracker 
   /**
    * Called upon startup by the child process, to fetch Task data.
    */
-  public synchronized JvmTask getTask(JVMId jvmId) 
+  public synchronized JvmTask getTask(JvmContext context) 
   throws IOException {
+    JVMId jvmId = context.jvmId;
     LOG.debug("JVM with ID : " + jvmId + " asked for a task");
+    // save pid of task JVM sent by child
+    jvmManager.setPidToJvm(jvmId, context.pid);
     if (!jvmManager.isJvmKnown(jvmId)) {
       LOG.info("Killing unknown JVM " + jvmId);
       return new JvmTask(null, true);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Mar  4 03:27:24 2011
@@ -55,17 +55,19 @@ interface TaskUmbilicalProtocol extends 
    * Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759
    * Version 16 Added fatalError for child to communicate fatal errors to TT
    * Version 16 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
+   * Version 17 Change in signature of getTask() for HADOOP-5488
    * */
 
-  public static final long versionID = 16L;
+  public static final long versionID = 17L;
   
   /**
    * Called when a child task process starts, to get its task.
-   * @param jvmId the ID of this JVM w.r.t the tasktracker that launched it
+   * @param context the JvmContext of the JVM w.r.t the TaskTracker that
+   *        launched it
    * @return Task object
    * @throws IOException 
    */
-  JvmTask getTask(JVMId jvmId) throws IOException;
+  JvmTask getTask(JvmContext context) throws IOException;
 
   /**
    * Report child's progress to parent.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java Fri Mar  4 03:27:24 2011
@@ -96,7 +96,8 @@ class Application<K1 extends WritableCom
     File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
     File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
     long logLength = TaskLog.getTaskLogLength(conf);
-    cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
+    cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
+        false);
 
     process = runClient(cmd, env);
     clientSocket = serverSocket.accept();

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=1076975&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java Fri Mar  4 03:27:24 2011
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.util.Shell;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A JUnit test to test Kill Job that has tasks with children and checks if the
+ * children(subprocesses of java task) are also killed when a task is killed.
+ */
+public class TestKillSubProcesses extends TestCase {
+
+  private static volatile Log LOG = LogFactory
+            .getLog(TestKillSubProcesses.class);
+
+  private static String TEST_ROOT_DIR = new File(System.getProperty(
+      "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
+
+  private static JobClient jobClient = null;
+
+  private static MiniMRCluster mr = null;
+  private static Path scriptDir = null;
+  private static String scriptDirName = null;
+  private static String pid = null;
+
+  // number of levels in the subtree of subprocesses of map task
+  private static int numLevelsOfSubProcesses = 4;
+
+  /**
+   * Runs a job, kills the job and verifies if the map task and its
+   * subprocesses are also killed properly or not.
+   */
+  private static void runKillingJobAndValidate(JobTracker jt, JobConf conf) throws IOException {
+
+    conf.setJobName("testkilljobsubprocesses");
+    conf.setMapperClass(KillingMapperWithChildren.class);
+    
+    scriptDir = new Path(TEST_ROOT_DIR + "/script");
+    RunningJob job = runJobAndSetProcessHandle(jt, conf);
+
+    // kill the job now
+    job.killJob();
+
+    while (job.cleanupProgress() == 0.0f) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException ie) {
+        LOG.warn("sleep is interrupted:" + ie);
+        break;
+      }
+    }
+
+    validateKillingSubprocesses(job, conf);
+    // Checking the Job status
+    assertEquals(job.getJobState(), JobStatus.KILLED);
+  }
+
+  /**
+   * Runs a job that will fail and verifies if the subprocesses of failed map
+   * task are killed properly or not.
+   */
+  private static void runFailingJobAndValidate(JobTracker jt, JobConf conf) throws IOException {
+
+    conf.setJobName("testfailjobsubprocesses");
+    conf.setMapperClass(FailingMapperWithChildren.class);
+    
+    // We don't want to run the failing map task 4 times. So we run it once and
+    // check if all the subprocesses are killed properly.
+    conf.setMaxMapAttempts(1);
+    
+    scriptDir = new Path(TEST_ROOT_DIR + "/script");
+    RunningJob job = runJobAndSetProcessHandle(jt, conf);
+    signalTask(TEST_ROOT_DIR + "/failjob/signalFile", conf);
+    validateKillingSubprocesses(job, conf);
+    // Checking the Job status
+    assertEquals(job.getJobState(), JobStatus.FAILED);
+  }
+  
+  /**
+   * Runs a job that will succeed and verifies if the subprocesses of succeeded
+   * map task are killed properly or not.
+   */
+  private static void runSuccessfulJobAndValidate(JobTracker jt, JobConf conf)
+               throws IOException {
+
+    conf.setJobName("testsucceedjobsubprocesses");
+    conf.setMapperClass(MapperWithChildren.class);
+
+    scriptDir = new Path(TEST_ROOT_DIR + "/script");
+    RunningJob job = runJobAndSetProcessHandle(jt, conf);
+    signalTask(TEST_ROOT_DIR + "/succeedjob/signalFile", conf);
+    validateKillingSubprocesses(job, conf);
+    // Checking the Job status
+    assertEquals(job.getJobState(), JobStatus.SUCCEEDED);
+  }
+
+  /**
+   * Runs the given job and saves the pid of map task.
+   * Also checks if the subprocesses of map task are alive.
+   */
+  private static RunningJob runJobAndSetProcessHandle(JobTracker jt, JobConf conf)
+                     throws IOException {
+    RunningJob job = runJob(conf);
+    while (job.getJobState() != JobStatus.RUNNING) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+
+    pid = null;
+    scriptDirName = scriptDir.toUri().getPath();
+    jobClient = new JobClient(conf);
+    
+    // get the taskAttemptID of the map task and use it to get the pid
+    // of map task
+    TaskReport[] mapReports = jobClient.getMapTaskReports(job.getID());
+
+    JobInProgress jip = jt.getJob(job.getID());
+    for(TaskReport tr : mapReports) {
+      TaskInProgress tip = jip.getTaskInProgress(tr.getTaskID());
+
+      // for this tip, get active tasks of all attempts
+      while(tip.getActiveTasks().size() == 0) {
+        //wait till the activeTasks Tree is built
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException ie) {
+          LOG.warn("sleep is interrupted:" + ie);
+          break;
+        }
+      }
+
+      for (Iterator<TaskAttemptID> it = 
+        tip.getActiveTasks().keySet().iterator(); it.hasNext();) {
+        TaskAttemptID id = it.next();
+        LOG.info("taskAttemptID of map task is " + id);
+        
+        while(pid == null) {
+          pid = mr.getTaskTrackerRunner(0).getTaskTracker().getPid(id);
+          if (pid == null) {
+            try {
+              Thread.sleep(500);
+            } catch(InterruptedException e) {}
+          }
+        }
+        LOG.info("pid of map task is " + pid);
+
+        // Checking if the map task is alive
+        assertTrue(ProcessTree.isAlive(pid));
+        LOG.info("The map task is alive before Job completion, as expected.");
+      }
+    }
+
+    // Checking if the descendant processes of map task are alive
+    if(ProcessTree.isSetsidAvailable) {
+      String childPid = UtilsForTests.getPidFromPidFile(
+                               scriptDirName + "/childPidFile" + 0);
+      while(childPid == null) {
+        LOG.warn(scriptDirName + "/childPidFile" + 0 + " is null; Sleeping...");
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException ie) {
+          LOG.warn("sleep is interrupted:" + ie);
+          break;
+        }
+        childPid = UtilsForTests.getPidFromPidFile(
+                               scriptDirName + "/childPidFile" + 0);
+      }
+
+      // As childPidFile0(leaf process in the subtree of processes with
+      // map task as root) is created, all other child pid files should
+      // have been created already(See the script for details).
+      // Now check if the descendants of map task are alive.
+      for(int i=0; i <= numLevelsOfSubProcesses; i++) {
+        childPid = UtilsForTests.getPidFromPidFile(
+                               scriptDirName + "/childPidFile" + i);
+        LOG.info("pid of the descendant process at level " + i +
+                 "in the subtree of processes(with the map task as the root)" +
+                 " is " + childPid);
+        assertTrue("Unexpected: The subprocess at level " + i +
+                   " in the subtree is not alive before Job completion",
+                   ProcessTree.isAlive(childPid));
+      }
+    }
+    return job;
+  }
+  
+  /**
+   * Verifies if the subprocesses of the map task are killed properly.
+   */
+  private static void validateKillingSubprocesses(RunningJob job, JobConf conf)
+                   throws IOException {
+    // wait till the the job finishes
+    while (!job.isComplete()) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+
+    // Checking if the map task got killed or not
+    assertTrue(!ProcessTree.isAlive(pid));
+    LOG.info("The map task is not alive after Job is completed, as expected.");
+
+    // Checking if the descendant processes of map task are killed properly
+    if(ProcessTree.isSetsidAvailable) {
+      for(int i=0; i <= numLevelsOfSubProcesses; i++) {
+        String childPid = UtilsForTests.getPidFromPidFile(
+                               scriptDirName + "/childPidFile" + i);
+        LOG.info("pid of the descendant process at level " + i +
+                 "in the subtree of processes(with the map task as the root)" +
+                 " is " + childPid);
+        assertTrue("Unexpected: The subprocess at level " + i +
+                   " in the subtree is alive after Job completion",
+                   !ProcessTree.isAlive(childPid));
+      }
+    }
+    FileSystem fs = FileSystem.get(conf);
+    if(fs.exists(scriptDir)) {
+      fs.delete(scriptDir, true);
+    }
+  }
+  
+  private static RunningJob runJob(JobConf conf) throws IOException {
+
+    final Path inDir = new Path(TEST_ROOT_DIR + "/killjob/input");
+    final Path outDir = new Path(TEST_ROOT_DIR + "/killjob/output");
+
+    FileSystem fs = FileSystem.get(conf);
+    if(fs.exists(scriptDir)) {
+      fs.delete(scriptDir, true);
+    }
+
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(0);
+    conf.set("test.build.data", TEST_ROOT_DIR);
+
+    return UtilsForTests.runJob(conf, inDir, outDir);
+  }
+
+  public void testJobKillFailAndSucceed() throws IOException {
+    if (Shell.WINDOWS) {
+      System.out.println(
+             "setsid doesn't work on WINDOWS as expected. Not testing");
+      return;
+    }
+    
+    JobConf conf=null;
+    try {
+      mr = new MiniMRCluster(1, "file:///", 1);
+
+      // run the TCs
+      conf = mr.createJobConf();
+      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+      runKillingJobAndValidate(jt, conf);
+      runFailingJobAndValidate(jt, conf);
+      runSuccessfulJobAndValidate(jt, conf);
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Creates signal file
+   */
+  private static void signalTask(String signalFile, JobConf conf) {
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      fs.createNewFile(new Path(signalFile));
+    } catch(IOException e) {
+      LOG.warn("Unable to create signal file. " + e);
+    }
+  }
+  
+  /**
+   * Runs a recursive shell script to create a chain of subprocesses
+   */
+  private static void runChildren(JobConf conf) throws IOException {
+    if (ProcessTree.isSetsidAvailable) {
+      FileSystem fs = FileSystem.get(conf);
+      TEST_ROOT_DIR = new Path(conf.get("test.build.data")).toUri().getPath();
+      scriptDir = new Path(TEST_ROOT_DIR + "/script");  
+    
+      // create shell script
+      Random rm = new Random();
+      Path scriptPath = new Path(scriptDir, "_shellScript_" + rm.nextInt()
+        + ".sh");
+      String shellScript = scriptPath.toString();
+      String script =
+        "echo $$ > " + scriptDir.toString() + "/childPidFile" + "$1\n" +
+        "echo hello\n" +
+        "if [ $1 != 0 ]\nthen\n" +
+        " sh " + shellScript + " $(($1-1))\n" +
+        "else\n" +
+        " while true\n do\n" +
+        "  sleep 2\n" +
+        " done\n" +
+        "fi";
+      DataOutputStream file = fs.create(scriptPath);
+      file.writeBytes(script);
+      file.close();
+
+      LOG.info("Calling script from map task of failjob : " + shellScript);
+      Runtime.getRuntime()
+          .exec(shellScript + " " + numLevelsOfSubProcesses);
+    
+      String childPid = UtilsForTests.getPidFromPidFile(scriptDir
+          + "/childPidFile" + 0);
+      while (childPid == null) {
+        LOG.warn(scriptDir + "/childPidFile" + 0 + " is null; Sleeping...");
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException ie) {
+          LOG.warn("sleep is interrupted:" + ie);
+          break;
+        }
+        childPid = UtilsForTests.getPidFromPidFile(scriptDir
+            + "/childPidFile" + 0);
+      }
+    }
+  }
+  
+  /**
+   * Mapper that starts children
+   */
+  static class MapperWithChildren extends MapReduceBase implements
+  Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+    FileSystem fs = null;
+    final Path signal = new Path(TEST_ROOT_DIR + "/script/signalFile");
+    public void configure(JobConf conf) {
+      try {
+        runChildren(conf);
+      } catch (Exception e) {
+        LOG.warn("Exception in configure: " +
+                 StringUtils.stringifyException(e));
+      }
+    }
+    
+    // Mapper waits for the signal(signal is the existence of a file)
+    public void map(WritableComparable key, Writable value,
+        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+        throws IOException {
+      if (fs != null) {
+        while (!fs.exists(signal)) {// wait for signal file creation
+          try {
+            reporter.progress();
+            synchronized (this) {
+              this.wait(1000);
+            }
+          } catch (InterruptedException ie) {
+            System.out.println("Interrupted while the map was waiting for "
+                               + " the signal.");
+            break;
+          }
+        }
+      }
+    }
+  }
+  
+  /**
+   * Mapper that waits till it gets killed.
+   */
+  static class KillingMapperWithChildren extends MapperWithChildren {
+    public void configure(JobConf conf) {
+      super.configure(conf);
+    }
+    
+    public void map(WritableComparable key, Writable value,
+        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+        throws IOException {
+
+      try {
+        while(true) {//just wait till kill happens
+          Thread.sleep(1000);
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Exception in KillMapperWithChild.map:" + e);
+      }
+    }
+  }
+  
+  /**
+   * Mapper that fails when recieves a signal. Signal is existence of a file.
+   */
+  static class FailingMapperWithChildren extends MapperWithChildren {
+    public void configure(JobConf conf) {
+      super.configure(conf);
+    }
+
+    public void map(WritableComparable key, Writable value,
+        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+        throws IOException {
+      if (fs != null) {
+        while (!fs.exists(signal)) {// wait for signal file creation
+          try {
+            reporter.progress();
+            synchronized (this) {
+              this.wait(1000);
+            }
+          } catch (InterruptedException ie) {
+            System.out.println("Interrupted while the map was waiting for "
+                               + " the signal.");
+            break;
+          }
+        }
+      }
+      throw new RuntimeException("failing map");
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Fri Mar  4 03:27:24 2011
@@ -440,7 +440,6 @@ public class TestTaskTrackerMemoryManage
       
       // Create TaskMemoryMonitorThread
       TaskMemoryManagerThread test = new TaskMemoryManagerThread(1000000L,
-                                                                5000L,
                                                                 5000L);
       // create process trees
       // tree rooted at 100 is over limit immediately, as it is

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Mar  4 03:27:24 2011
@@ -548,7 +548,9 @@ public class UtilsForTests {
                            int numReds) throws IOException {
 
     FileSystem fs = FileSystem.get(conf);
-    fs.delete(outDir, true);
+    if (fs.exists(outDir)) {
+      fs.delete(outDir, true);
+    }
     if (!fs.exists(inDir)) {
       fs.mkdirs(inDir);
     }
@@ -683,5 +685,52 @@ public class UtilsForTests {
     config.writeXml(fos);
     fos.close();
   }
+
+  
+  /**
+   * Get PID from a pid-file.
+   * 
+   * @param pidFileName
+   *          Name of the pid-file.
+   * @return the PID string read from the pid-file. Returns null if the
+   *         pidFileName points to a non-existing file or if read fails from the
+   *         file.
+   */
+  public static String getPidFromPidFile(String pidFileName) {
+    BufferedReader pidFile = null;
+    FileReader fReader = null;
+    String pid = null;
+
+    try {
+      fReader = new FileReader(pidFileName);
+      pidFile = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      LOG.debug("PidFile doesn't exist : " + pidFileName);
+      return pid;
+    }
+
+    try {
+      pid = pidFile.readLine();
+    } catch (IOException i) {
+      LOG.error("Failed to read from " + pidFileName);
+    } finally {
+      try {
+        if (fReader != null) {
+          fReader.close();
+        }
+        try {
+          if (pidFile != null) {
+            pidFile.close();
+          }
+        } catch (IOException i) {
+          LOG.warn("Error closing the stream " + pidFile);
+        }
+      } catch (IOException i) {
+        LOG.warn("Error closing the stream " + fReader);
+      }
+    }
+    return pid;
+  }
+  
 }