You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/09/09 06:38:02 UTC

svn commit: r693359 - in /hadoop/core/trunk: ./ conf/ src/core/org/apache/hadoop/util/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/util/

Author: ddas
Date: Mon Sep  8 21:38:01 2008
New Revision: 693359

URL: http://svn.apache.org/viewvc?rev=693359&view=rev
Log:
HADOOP-3581. Prevents memory intensive user tasks from taking down nodes. Contributed by Vinod K V.

Added:
    hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
    hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=693359&r1=693358&r2=693359&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Sep  8 21:38:01 2008
@@ -135,6 +135,9 @@
     HADOOP-3941. Extend FileSystem API to return file-checksums.
     (szetszwo)
 
+    HADOOP-3581. Prevents memory intensive user tasks from taking down 
+    nodes. (Vinod K V via ddas)
+
   IMPROVEMENTS
 
     HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=693359&r1=693358&r2=693359&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Mon Sep  8 21:38:01 2008
@@ -723,6 +723,21 @@
 </property>
 
 <property>
+  <name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name>
+  <value>5000</value>
+  <description>The interval, in milliseconds, for which the tasktracker waits
+   between two cycles of monitoring its tasks' memory usage.</description>
+</property>
+
+<property>
+  <name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name>
+  <value>5000</value>
+  <description>The time, in milliseconds, the tasktracker waits for sending a
+  SIGKILL to a process that has overrun memory limits, after it has been sent
+  a SIGTERM.</description>
+</property>
+
+<property>
   <name>mapred.map.tasks</name>
   <value>2</value>
   <description>The default number of map tasks per job.  Typically set

Added: hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java?rev=693359&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java Mon Sep  8 21:38:01 2008
@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.Arrays;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * A Proc file-system based ProcfsBasedProcessTree. Works on Linux and Cygwin.
+ */
+public class ProcfsBasedProcessTree {
+
+  private static final Log LOG = LogFactory
+      .getLog("org.apache.hadoop.mapred.ProcfsBasedProcessTree");
+
+  private static final String PROCFS = "/proc/";
+  public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
+  private long sleepTimeBeforeSigKill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
+  private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern
+      .compile("^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+\\s){16}([0-9]+)(\\s[0-9-]+){16}");
+
+  private Integer pid = -1;
+
+  private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
+
+  public ProcfsBasedProcessTree(String pid) {
+    this.pid = getValidPID(pid);
+  }
+
+  public void setSigKillInterval(long interval) {
+    sleepTimeBeforeSigKill = interval;
+  }
+
+  /**
+   * Checks if the ProcfsBasedProcessTree is available on this system.
+   * 
+   * @return true if ProcfsBasedProcessTree is available. False otherwise.
+   */
+  public static boolean isAvailable() {
+    try {
+      String osName = System.getProperty("os.name");
+      if (!osName.startsWith("Linux") && !osName.startsWith("Windows")) {
+        LOG
+            .info("ProcfsBasedProcfsBasedProcessTree currently is supported only "
+                + "on Linux and Windows");
+        return false;
+      }
+    } catch (SecurityException se) {
+      LOG.warn("Failed to get Operating System name. " + se);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Get the process-tree with latest state.
+   * 
+   * @return the process-tree with latest state.
+   */
+  public ProcfsBasedProcessTree getProcessTree() {
+    if (pid != -1) {
+      // Get the list of processes
+      List<Integer> processList = getProcessList();
+
+      Map<Integer, ProcessInfo> allProcessInfo = new HashMap<Integer, ProcessInfo>();
+      processTree.clear();
+
+      ProcessInfo me = null;
+      for (Integer proc : processList) {
+        // Get information for each process
+        ProcessInfo pInfo = new ProcessInfo(proc);
+        constructProcessInfo(pInfo);
+        allProcessInfo.put(proc, pInfo);
+        if (proc.equals(this.pid)) {
+          me = pInfo; // cache 'me'
+          processTree.put(proc, pInfo);
+        }
+      }
+
+      // Add each process to its parent.
+      for (Map.Entry<Integer, ProcessInfo> entry : allProcessInfo.entrySet()) {
+        Integer pID = entry.getKey();
+        if (pID != 1) {
+          ProcessInfo pInfo = entry.getValue();
+          ProcessInfo parentPInfo = allProcessInfo.get(pInfo.getPpid());
+          if (parentPInfo != null) {
+            parentPInfo.addChild(pInfo);
+          }
+        }
+      }
+
+      // now start constructing the process-tree
+      LinkedList<ProcessInfo> pInfoQueue = new LinkedList<ProcessInfo>();
+      pInfoQueue.addAll(me.getChildren());
+      while (!pInfoQueue.isEmpty()) {
+        ProcessInfo pInfo = pInfoQueue.remove();
+        if (!processTree.containsKey(pInfo.getPid())) {
+          processTree.put(pInfo.getPid(), pInfo);
+        }
+        pInfoQueue.addAll(pInfo.getChildren());
+      }
+
+      if (LOG.isDebugEnabled()) {
+        // Log.debug the ProcfsBasedProcessTree
+        LOG.debug(this.toString());
+      }
+    }
+    return this;
+  }
+
+  /**
+   * Is the process-tree alive? Currently we care only about the status of the
+   * root-process.
+   * 
+   * @return true if the process-true is alive, false otherwise.
+   */
+  public boolean isAlive() {
+    if (pid == -1) {
+      return false;
+    } else {
+      return this.isAlive(pid);
+    }
+  }
+
+  /**
+   * Destroy the process-tree. Currently we only make sure the root process is
+   * gone. It is the responsibility of the root process to make sure that all
+   * its descendants are cleaned up.
+   */
+  public void destroy() {
+    LOG.debug("Killing ProcfsBasedProcessTree of " + pid);
+    if (pid == -1) {
+      return;
+    }
+    ShellCommandExecutor shexec = null;
+
+    if (isAlive(this.pid)) {
+      try {
+        String[] args = { "kill", this.pid.toString() };
+        shexec = new ShellCommandExecutor(args);
+        shexec.execute();
+      } catch (IOException ioe) {
+        LOG.warn("Error executing shell command " + ioe);
+      } finally {
+        LOG.info("Killing " + pid + " with SIGTERM. Exit code "
+            + shexec.getExitCode());
+      }
+    }
+
+    SigKillThread sigKillThread = new SigKillThread();
+    sigKillThread.setDaemon(true);
+    sigKillThread.start();
+  }
+
+  /**
+   * Get the cumulative virtual memory used by all the processes in the
+   * process-tree.
+   * 
+   * @return cumulative virtual memory used by the process-tree in bytes.
+   */
+  public long getCumulativeVmem() {
+    long total = 0;
+    for (ProcessInfo p : processTree.values()) {
+      if (p != null) {
+        total += p.getVmem();
+      }
+    }
+    return total;
+  }
+
+  /**
+   * Get PID from a pid-file.
+   * 
+   * @param pidFileName
+   *          Name of the pid-file.
+   * @return the PID string read from the pid-file. Returns null if the
+   *         pidFileName points to a non-existing file or if read fails from the
+   *         file.
+   */
+  public static String getPidFromPidFile(String pidFileName) {
+    BufferedReader pidFile = null;
+    FileReader fReader = null;
+    String pid = null;
+
+    try {
+      fReader = new FileReader(pidFileName);
+      pidFile = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      LOG.debug("PidFile doesn't exist : " + pidFileName);
+      return pid;
+    }
+
+    try {
+      pid = pidFile.readLine();
+    } catch (IOException i) {
+      LOG.error("Failed to read from " + pidFileName);
+    } finally {
+      try {
+        if (fReader != null) {
+          fReader.close();
+        }
+        try {
+          if (pidFile != null) {
+            pidFile.close();
+          }
+        } catch (IOException i) {
+          LOG.warn("Error closing the stream " + pidFile);
+        }
+      } catch (IOException i) {
+        LOG.warn("Error closing the stream " + fReader);
+      }
+    }
+    return pid;
+  }
+
+  private Integer getValidPID(String pid) {
+    Integer retPid = -1;
+    try {
+      retPid = Integer.parseInt((String) pid);
+      if (retPid <= 0) {
+        retPid = -1;
+      }
+    } catch (NumberFormatException nfe) {
+      retPid = -1;
+    }
+    return retPid;
+  }
+
+  /**
+   * Get the list of all processes in the system.
+   */
+  private List<Integer> getProcessList() {
+    String[] processDirs = (new File(PROCFS)).list();
+    List<Integer> processList = new ArrayList<Integer>();
+
+    for (String dir : processDirs) {
+      try {
+        int pd = Integer.parseInt(dir);
+        if ((new File(PROCFS + dir)).isDirectory()) {
+          processList.add(Integer.valueOf(pd));
+        }
+      } catch (NumberFormatException n) {
+        // skip this directory
+      } catch (SecurityException s) {
+        // skip this process
+      }
+    }
+    return processList;
+  }
+
+  /**
+   * 
+   * Construct the ProcessInfo using the process' PID and procfs and return the
+   * same. Returns null on failing to read from procfs,
+   */
+  private ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
+    ProcessInfo ret = null;
+    // Read "/proc/<pid>/stat" file
+    BufferedReader in = null;
+    FileReader fReader = null;
+    try {
+      fReader = new FileReader(PROCFS + pinfo.getPid() + "/stat");
+      in = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      // The process vanished in the interim!
+      return ret;
+    }
+
+    ret = pinfo;
+    try {
+      String str = in.readLine(); // only one line
+      Matcher m = PROCFS_STAT_FILE_FORMAT.matcher(str);
+      boolean mat = m.find();
+      if (mat) {
+        // Set ( name ) ( ppid ) ( pgrpId ) (session ) (vsize )
+        pinfo.update(m.group(2), Integer.parseInt(m.group(3)), Integer
+            .parseInt(m.group(4)), Integer.parseInt(m.group(5)), Long
+            .parseLong(m.group(7)));
+      }
+    } catch (IOException io) {
+      LOG.warn("Error reading the stream " + io);
+      ret = null;
+    } finally {
+      // Close the streams
+      try {
+        if (fReader != null) {
+          fReader.close();
+        }
+        try {
+          if (in != null) {
+            in.close();
+          }
+        } catch (IOException i) {
+          LOG.warn("Error closing the stream " + in);
+        }
+      } catch (IOException i) {
+        LOG.warn("Error closing the stream " + fReader);
+      }
+    }
+
+    return ret;
+  }
+
+  /**
+   * Is the process with PID pid still alive?
+   */
+  private boolean isAlive(Integer pid) {
+    // This method assumes that isAlive is called on a pid that was alive not
+    // too long ago, and hence assumes no chance of pid-wrapping-around.
+    ShellCommandExecutor shexec = null;
+    try {
+      String[] args = { "kill", "-0", pid.toString() };
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+    } catch (ExitCodeException ee) {
+      return false;
+    } catch (IOException ioe) {
+      LOG.warn("Error executing shell command "
+          + Arrays.toString(shexec.getExecString()) + ioe);
+      return false;
+    }
+    return (shexec.getExitCode() == 0 ? true : false);
+  }
+
+  /**
+   * Helper thread class that kills process-tree with SIGKILL in background
+   */
+  private class SigKillThread extends Thread {
+
+    public void run() {
+      this.setName(this.getClass().getName() + "-" + String.valueOf(pid));
+      ShellCommandExecutor shexec = null;
+
+      try {
+        // Sleep for some time before sending SIGKILL
+        Thread.sleep(sleepTimeBeforeSigKill);
+      } catch (InterruptedException i) {
+        LOG.warn("Thread sleep is interrupted.");
+      }
+
+      // Kill the root process with SIGKILL if it is still alive
+      if (ProcfsBasedProcessTree.this.isAlive(pid)) {
+        try {
+          String[] args = { "kill", "-9", pid.toString() };
+          shexec = new ShellCommandExecutor(args);
+          shexec.execute();
+        } catch (IOException ioe) {
+          LOG.warn("Error executing shell command " + ioe);
+        } finally {
+          LOG.info("Killing " + pid + " with SIGKILL. Exit code "
+              + shexec.getExitCode());
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns a string printing PIDs of process present in the
+   * ProcfsBasedProcessTree. Output format : [pid pid ..]
+   */
+  public String toString() {
+    StringBuffer pTree = new StringBuffer("[ ");
+    for (Integer p : processTree.keySet()) {
+      pTree.append(p);
+      pTree.append(" ");
+    }
+    return pTree.substring(0, pTree.length()) + "]";
+  }
+
+  /**
+   * 
+   * Class containing information of a process.
+   * 
+   */
+  private static class ProcessInfo {
+    private Integer pid; // process-id
+    private String name; // command name
+    private Integer pgrpId; // process group-id
+    private Integer ppid; // parent process-id
+    private Integer sessionId; // session-id
+    private Long vmem; // virtual memory usage
+    private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children
+
+    public ProcessInfo(int pid) {
+      this.pid = Integer.valueOf(pid);
+    }
+
+    public Integer getPid() {
+      return pid;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public Integer getPgrpId() {
+      return pgrpId;
+    }
+
+    public Integer getPpid() {
+      return ppid;
+    }
+
+    public Integer getSessionId() {
+      return sessionId;
+    }
+
+    public Long getVmem() {
+      return vmem;
+    }
+
+    public boolean isParent(ProcessInfo p) {
+      if (pid.equals(p.getPpid())) {
+        return true;
+      }
+      return false;
+    }
+
+    public void update(String name, Integer ppid, Integer pgrpId,
+        Integer sessionId, Long vmem) {
+      this.name = name;
+      this.ppid = ppid;
+      this.pgrpId = pgrpId;
+      this.sessionId = sessionId;
+      this.vmem = vmem;
+    }
+
+    public boolean addChild(ProcessInfo p) {
+      return children.add(p);
+    }
+
+    public List<ProcessInfo> getChildren() {
+      return children;
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=693359&r1=693358&r2=693359&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java Mon Sep  8 21:38:01 2008
@@ -216,7 +216,7 @@
                                                 long tailLength
                                                ) throws IOException {
     return captureOutAndError(null, cmd, stdoutFilename,
-                              stderrFilename, tailLength );
+                              stderrFilename, tailLength, null );
   }
 
   /**
@@ -237,12 +237,44 @@
                                                 File stderrFilename,
                                                 long tailLength
                                                ) throws IOException {
+    return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
+        tailLength, null);
+  }
+
+  /**
+   * Wrap a command in a shell to capture stdout and stderr to files.
+   * Setup commands such as setting memory limit can be passed which 
+   * will be executed before exec.
+   * If the tailLength is 0, the entire output will be saved.
+   * @param setup The setup commands for the execed process.
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @param pidFileName The name of the pid-file
+   * @return the modified command that should be run
+   */
+  public static List<String> captureOutAndError(List<String> setup,
+                                                List<String> cmd, 
+                                                File stdoutFilename,
+                                                File stderrFilename,
+                                                long tailLength,
+                                                String pidFileName
+                                               ) throws IOException {
     String stdout = FileUtil.makeShellPath(stdoutFilename);
     String stderr = FileUtil.makeShellPath(stderrFilename);
     List<String> result = new ArrayList<String>(3);
     result.add(bashCommand);
     result.add("-c");
     StringBuffer mergedCmd = new StringBuffer();
+    
+    // Spit out the pid to pidFileName
+    if (pidFileName != null) {
+      mergedCmd.append("echo $$ > ");
+      mergedCmd.append(pidFileName);
+      mergedCmd.append(" ;");
+    }
+
     if (setup != null && setup.size() > 0) {
       mergedCmd.append(addCommand(setup, false));
       mergedCmd.append(";");

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=693359&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Mon Sep  8 21:38:01 2008
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.util.ProcfsBasedProcessTree;
+
+/**
+ * Manages memory usage of tasks running under this TT. Kills any task-trees
+ * that overflow and over-step memory limits.
+ */
+class TaskMemoryManagerThread extends Thread {
+
+  private static Log LOG = LogFactory.getLog(TaskMemoryManagerThread.class);
+
+  private TaskTracker taskTracker;
+  private long monitoringInterval;
+  private long sleepTimeBeforeSigKill;
+
+  private Map<TaskAttemptID, ProcessTreeInfo> processTreeInfoMap;
+  private Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
+  private List<TaskAttemptID> tasksToBeRemoved;
+
+  public TaskMemoryManagerThread(TaskTracker taskTracker) {
+    this.taskTracker = taskTracker;
+    setName(this.getClass().getName());
+
+    processTreeInfoMap = new HashMap<TaskAttemptID, ProcessTreeInfo>();
+    tasksToBeAdded = new HashMap<TaskAttemptID, ProcessTreeInfo>();
+    tasksToBeRemoved = new ArrayList<TaskAttemptID>();
+
+    monitoringInterval = taskTracker.getJobConf().getLong(
+        "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L);
+    sleepTimeBeforeSigKill = taskTracker.getJobConf().getLong(
+        "mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill",
+        ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+  }
+
+  public void addTask(TaskAttemptID tid, long memLimit) {
+    synchronized (tasksToBeAdded) {
+      LOG.debug("Tracking ProcessTree " + tid + " for the first time");
+      // TODO: Negative values must have been checked in JobConf.
+      memLimit = (memLimit < 0 ? JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT
+          : memLimit);
+      ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit,
+          sleepTimeBeforeSigKill);
+      tasksToBeAdded.put(tid, ptInfo);
+    }
+  }
+
+  public void removeTask(TaskAttemptID tid) {
+    synchronized (tasksToBeRemoved) {
+      tasksToBeRemoved.add(tid);
+    }
+  }
+
+  private static class ProcessTreeInfo {
+    private TaskAttemptID tid;
+    private String pid;
+    private ProcfsBasedProcessTree pTree;
+    private long memLimit;
+
+    public ProcessTreeInfo(TaskAttemptID tid, String pid,
+        ProcfsBasedProcessTree pTree, long memLimit, long sleepTimeBeforeSigKill) {
+      this.tid = tid;
+      this.pid = pid;
+      this.pTree = pTree;
+      if (this.pTree != null) {
+        this.pTree.setSigKillInterval(sleepTimeBeforeSigKill);
+      }
+      this.memLimit = memLimit;
+    }
+
+    public TaskAttemptID getTID() {
+      return tid;
+    }
+
+    public String getPID() {
+      return pid;
+    }
+
+    public void setPid(String pid) {
+      this.pid = pid;
+    }
+
+    public ProcfsBasedProcessTree getProcessTree() {
+      return pTree;
+    }
+
+    public void setProcessTree(ProcfsBasedProcessTree pTree) {
+      this.pTree = pTree;
+    }
+
+    public long getMemLimit() {
+      return memLimit;
+    }
+  }
+
+  @Override
+  public void run() {
+
+    LOG.info("Starting thread: " + this.getClass());
+
+    while (true) {
+      // Print the processTrees for debugging.
+      if (LOG.isDebugEnabled()) {
+        StringBuffer tmp = new StringBuffer("[ ");
+        for (ProcessTreeInfo p : processTreeInfoMap.values()) {
+          tmp.append(p.getPID());
+          tmp.append(" ");
+        }
+        LOG.debug("Current ProcessTree list : "
+            + tmp.substring(0, tmp.length()) + "]");
+      }
+
+      //Add new Tasks
+      synchronized (tasksToBeAdded) {
+        processTreeInfoMap.putAll(tasksToBeAdded);
+        tasksToBeAdded.clear();
+      }
+
+      //Remove finished Tasks
+      synchronized (tasksToBeRemoved) {
+        for (TaskAttemptID tid : tasksToBeRemoved) {
+          processTreeInfoMap.remove(tid);
+        }
+        tasksToBeRemoved.clear();
+      }
+
+      // Now, check memory usage and kill any overflowing tasks
+      for (Iterator<Map.Entry<TaskAttemptID, ProcessTreeInfo>> it = processTreeInfoMap
+          .entrySet().iterator(); it.hasNext();) {
+
+        Map.Entry<TaskAttemptID, ProcessTreeInfo> entry = it.next();
+        TaskAttemptID tid = entry.getKey();
+        ProcessTreeInfo ptInfo = entry.getValue();
+        String pId = ptInfo.getPID();
+
+        // Initialize any uninitialized processTrees
+        if (pId == null) {
+          pId = getPid(tid); // get pid from pid-file
+          if (pId != null) {
+            // PID will be null, either if the pid file is yet to be created
+            // or if the tip is finished and we removed pidFile, but the TIP
+            // itself is still retained in runningTasks till successful
+            // transmission to JT
+
+            // create process tree object
+            ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(pId);
+            LOG.debug("Tracking ProcessTree " + pId + " for the first time");
+
+            ptInfo.setPid(pId);
+            ptInfo.setProcessTree(pt);
+            processTreeInfoMap.put(tid, ptInfo);
+          }
+        }
+        // End of initializing any uninitialized processTrees
+
+        if (pId == null) {
+          continue; // processTree cannot be tracked
+        }
+
+        LOG.debug("Constructing ProcessTree for : PID = " + pId + " TID = "
+            + tid);
+        ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
+        pTree = pTree.getProcessTree(); // get the updated process-tree
+        ptInfo.setProcessTree(pTree); // update ptInfo with proces-tree of
+                                      // updated state
+        long currentMemUsage = pTree.getCumulativeVmem();
+        long limit = ptInfo.getMemLimit();
+        LOG.info("Memory usage of ProcessTree " + pId + " :" + currentMemUsage
+            + ". Limit : " + limit);
+
+        if (limit != JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT
+            && currentMemUsage > limit) {
+          // Task (the root process) is still alive and overflowing memory.
+          // Clean up.
+          String msg = "TaskTree [pid=" + pId + ",tipID=" + tid
+              + "] is running beyond memory-limits. Current usage : "
+              + currentMemUsage + ". Limit : " + limit + ". Killing task.";
+          LOG.warn(msg);
+          taskTracker.cleanUpOverMemoryTask(tid, msg);
+
+          // Now destroy the ProcessTree, remove it from monitoring map.
+          pTree.destroy();
+          it.remove();
+          LOG.info("Removed ProcessTree with root " + pId);
+        }
+      }
+
+      // Sleep for some time before beginning next cycle
+      try {
+        LOG.debug(this.getClass() + " : Sleeping for " + monitoringInterval
+            + " ms");
+        Thread.sleep(monitoringInterval);
+      } catch (InterruptedException ie) {
+        LOG.warn(this.getClass()
+            + " interrupted. Finishing the thread and returning.");
+        return;
+      }
+    }
+  }
+
+  /**
+   * Load pid of the task from the pidFile.
+   * 
+   * @param tipID
+   * @return the pid of the task process.
+   */
+  private String getPid(TaskAttemptID tipID) {
+    Path pidFileName = getPidFilePath(tipID);
+    if (pidFileName == null) {
+      return null;
+    }
+    return ProcfsBasedProcessTree.getPidFromPidFile(pidFileName.toString());
+  }
+
+  private LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+
+  /**
+   * Get the pidFile path of a Task
+   * @param tipID
+   * @return pidFile's Path
+   */
+  Path getPidFilePath(TaskAttemptID tipID) {
+    Path pidFileName = null;
+    try {
+      pidFileName = lDirAlloc.getLocalPathToRead(
+          (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + tipID),
+          taskTracker.getJobConf());
+    } catch (IOException i) {
+      // PID file is not there
+      LOG.debug("Failed to get pidFile name for " + tipID);
+    }
+    return pidFileName;
+  }
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=693359&r1=693358&r2=693359&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Mon Sep  8 21:38:01 2008
@@ -373,6 +373,13 @@
       vargs.add(Integer.toString(address.getPort())); 
       vargs.add(taskid.toString());                      // pass task identifier
 
+      String pidFile = null;
+      if (tracker.isTaskMemoryManagerEnabled()) {
+        pidFile = lDirAlloc.getLocalPathForWrite(
+            (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + taskid),
+            this.conf).toString();
+      }
+
       // set memory limit using ulimit if feasible and necessary ...
       String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
       List<String> setup = null;
@@ -389,7 +396,8 @@
       stdout.getParentFile().mkdirs();
       tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
       List<String> wrappedCommand = 
-        TaskLog.captureOutAndError(setup, vargs, stdout, stderr, logSize);
+        TaskLog.captureOutAndError(setup, vargs, stdout, stderr, logSize, pidFile);
+      LOG.debug("child jvm command : " + wrappedCommand.toString());
       Map<String, String> env = new HashMap<String, String>();
       StringBuffer ldLibraryPath = new StringBuffer();
       ldLibraryPath.append(workDir.toString());

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=693359&r1=693358&r2=693359&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Mon Sep  8 21:38:01 2008
@@ -75,6 +75,7 @@
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
@@ -169,6 +170,7 @@
   private static final String SUBDIR = "taskTracker";
   private static final String CACHEDIR = "archive";
   private static final String JOBCACHE = "jobcache";
+  private static final String PIDDIR = "pids";
   private JobConf originalConf;
   private JobConf fConf;
   private int maxCurrentMapTasks;
@@ -178,6 +180,8 @@
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
   private CleanupQueue directoryCleanupThread;
+  private TaskMemoryManagerThread taskMemoryManager;
+  private boolean taskMemoryManagerEnabled = false;
   private long maxVirtualMemoryForTasks 
                                     = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
   private long defaultMemoryPerTask = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
@@ -342,6 +346,10 @@
   static String getJobCacheSubdir() {
     return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
   }
+
+  static String getPidFilesSubdir() {
+    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.PIDDIR;
+  }
     
   public long getProtocolVersion(String protocol, 
                                  long clientVersion) throws IOException {
@@ -453,6 +461,13 @@
                                     (maxCurrentMapTasks + 
                                         maxCurrentReduceTasks);
     }
+    // start the taskMemoryManager thread only if enabled
+    setTaskMemoryManagerEnabledFlag();
+    if (isTaskMemoryManagerEnabled()) {
+      taskMemoryManager = new TaskMemoryManagerThread(this);
+      taskMemoryManager.setDaemon(true);
+      taskMemoryManager.start();
+    }
     this.running = true;
   }
   
@@ -1447,6 +1462,9 @@
     }
     try {
       localizeJob(tip);
+      if (isTaskMemoryManagerEnabled()) {
+        taskMemoryManager.addTask(t.getTaskID(), getMemoryForTask(tip));
+      }
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
@@ -2036,6 +2054,15 @@
     void cleanup(boolean needCleanup) throws IOException {
       TaskAttemptID taskId = task.getTaskID();
       LOG.debug("Cleaning up " + taskId);
+
+      // Remove the associated pid-file, if any
+      if (TaskTracker.this.isTaskMemoryManagerEnabled()) {
+        Path pidFilePath = taskMemoryManager.getPidFilePath(taskId);
+        if (pidFilePath != null) {
+          directoryCleanupThread.addToQueue(pidFilePath);
+        }
+      }
+
       synchronized (TaskTracker.this) {
         if (needCleanup) {
           tasks.remove(taskId);
@@ -2240,6 +2267,10 @@
     } else {
       LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
     }
+    // Remove the entry from taskMemoryManagerThread's data structures.
+    if (isTaskMemoryManagerEnabled()) {
+      taskMemoryManager.removeTask(taskid);
+    }
   }
 
   /**
@@ -2702,4 +2733,48 @@
     return maxCurrentReduceTasks;
   }
 
+  /**
+   * Is the TaskMemoryManager Enabled on this system?
+   * @return true if enabled, false otherwise.
+   */
+  boolean isTaskMemoryManagerEnabled() {
+    return taskMemoryManagerEnabled;
+  }
+
+  private void setTaskMemoryManagerEnabledFlag() {
+    if (!ProcfsBasedProcessTree.isAvailable()) {
+      LOG.info("ProcessTree implementation is missing on this system. "
+          + "TaskMemoryManager is disabled.");
+      taskMemoryManagerEnabled = false;
+      return;
+    }
+
+    Long tasksMaxMem = getMaxVirtualMemoryForTasks();
+    if (tasksMaxMem == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) {
+      LOG.info("TaskTracker's tasksMaxMem is not set. TaskMemoryManager is "
+          + "disabled.");
+      taskMemoryManagerEnabled = false;
+      return;
+    }
+
+    taskMemoryManagerEnabled = true;
+  }
+
+  /**
+   * Clean-up the task that TaskMemoryMangerThread requests to do so.
+   * @param tid
+   * @param diagnosticMsg
+   */
+  synchronized void cleanUpOverMemoryTask(TaskAttemptID tid,
+      String diagnosticMsg) {
+    TaskInProgress tip = runningTasks.get(tid);
+    if (tip != null) {
+      tip.reportDiagnosticInfo(diagnosticMsg);
+      try {
+        purgeTask(tip, true); // Marking it as failure.
+      } catch (IOException ioe) {
+        LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe);
+      }
+    }
+  }
 }

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=693359&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Mon Sep  8 21:38:01 2008
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.examples.WordCount;
+import org.apache.hadoop.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.TestCase;
+
+public class TestTaskTrackerMemoryManager extends TestCase {
+
+  private static final Log LOG = LogFactory.getLog(TestTaskTrackerMemoryManager.class);
+  private MiniDFSCluster miniDFSCluster;
+  private MiniMRCluster miniMRCluster;
+
+  private void startCluster(JobConf conf) throws Exception {
+    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fileSys = miniDFSCluster.getFileSystem();
+    String namenode = fileSys.getUri().toString();
+    miniMRCluster = new MiniMRCluster(1, namenode, 1, null, null, conf);
+  }
+
+  @Override
+  protected void tearDown() {
+    if (miniMRCluster != null) {
+      miniMRCluster.shutdown();
+    }
+    if (miniDFSCluster != null) {
+      miniDFSCluster.shutdown();
+    }
+  }
+
+  private void runWordCount(JobConf conf) throws Exception {
+    Path input = new Path("input.txt");
+    Path output = new Path("output");
+
+    OutputStream os = miniDFSCluster.getFileSystem().create(input);
+    Writer wr = new OutputStreamWriter(os);
+    wr.write("hello1\n");
+    wr.write("hello2\n");
+    wr.write("hello3\n");
+    wr.write("hello4\n");
+    wr.close();
+
+    Tool WordCount = new WordCount();
+    if (conf != null) {
+      WordCount.setConf(conf);
+    }
+    ToolRunner.run(WordCount, new String[] { input.toString(),
+        output.toString() });
+  }
+
+  public void testNormalTaskAndLimitedTT() throws Exception {
+    // Run the test only if memory management is enabled
+
+    try {
+      if (!ProcfsBasedProcessTree.isAvailable()) {
+        LOG.info("Currently ProcessTree has only one implementation "
+            + "ProcfsBasedProcessTree, which is not available on this "
+            + "system. Not testing");
+        return;
+      }
+    } catch (Exception e) {
+      LOG.info(StringUtils.stringifyException(e));
+      return;
+    }
+
+    Pattern diagMsgPattern = Pattern
+        .compile("TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond "
+            + "memory-limits. Current usage : [0-9]*. Limit : [0-9]*. Killing task.");
+    Matcher mat = null;
+
+    // Start cluster with proper configuration.
+    JobConf fConf = new JobConf();
+
+    fConf.setMaxVirtualMemoryForTasks(Long.valueOf(10000000000L)); // Fairly large value for WordCount to succeed
+    startCluster(fConf);
+
+    // Set up job.
+    JobConf conf = new JobConf();
+    JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
+    conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
+        + jt.getTrackerPort());
+    NameNode nn = miniDFSCluster.getNameNode();
+    conf.set("fs.default.name", "hdfs://"
+        + nn.getNameNodeAddress().getHostName() + ":"
+        + nn.getNameNodeAddress().getPort());
+
+    // Start the job.
+    boolean success = true;
+    try {
+      runWordCount(conf);
+      success = true;
+    } catch (Exception e) {
+      success = false;
+    }
+
+    // Job has to succeed
+    assertTrue(success);
+
+    // Alas, we don't have a way to get job id/Task completion events from
+    // WordCount
+    JobClient jClient = new JobClient(conf);
+    JobStatus[] jStatus = jClient.getAllJobs();
+    JobStatus js = jStatus[0]; // Our only job
+    RunningJob rj = jClient.getJob(js.getJobID());
+
+    // All events
+    TaskCompletionEvent[] taskComplEvents = rj.getTaskCompletionEvents(0);
+
+    for (TaskCompletionEvent tce : taskComplEvents) {
+      String[] diagnostics = jClient.jobSubmitClient.getTaskDiagnostics(tce
+          .getTaskAttemptId());
+
+      if (diagnostics != null) {
+        for (String str : diagnostics) {
+          mat = diagMsgPattern.matcher(str);
+          // The error pattern shouldn't be there in any TIP's diagnostics
+          assertFalse(mat.find());
+        }
+      }
+    }
+  }
+
+  public void testOOMTaskAndLimitedTT() throws Exception {
+
+    // Run the test only if memory management is enabled
+
+    try {
+      if (!ProcfsBasedProcessTree.isAvailable()) {
+        LOG.info("Currently ProcessTree has only one implementation "
+            + "ProcfsBasedProcessTree, which is not available on this "
+            + "system. Not testing");
+        return;
+      }
+    } catch (Exception e) {
+      LOG.info(StringUtils.stringifyException(e));
+      return;
+    }
+
+    long PER_TASK_LIMIT = 444; // Enough to kill off WordCount.
+    Pattern diagMsgPattern = Pattern
+        .compile("TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond "
+            + "memory-limits. Current usage : [0-9]*. Limit : "
+            + PER_TASK_LIMIT + ". Killing task.");
+    Matcher mat = null;
+
+    // Start cluster with proper configuration.
+    JobConf fConf = new JobConf();
+    fConf.setMaxVirtualMemoryForTasks(Long.valueOf(100000));
+    fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300));
+            //very small value, so that no task escapes to successful completion.
+    startCluster(fConf);
+
+    // Set up job.
+    JobConf conf = new JobConf();
+    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
+    JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
+    conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
+        + jt.getTrackerPort());
+    NameNode nn = miniDFSCluster.getNameNode();
+    conf.set("fs.default.name", "hdfs://"
+        + nn.getNameNodeAddress().getHostName() + ":"
+        + nn.getNameNodeAddress().getPort());
+
+    // Start the job.
+    boolean success = true;
+    try {
+      runWordCount(conf);
+      success = true;
+    } catch (Exception e) {
+      success = false;
+    }
+
+    // Job has to fail
+    assertFalse(success);
+
+    // Alas, we don't have a way to get job id/Task completion events from
+    // WordCount
+    JobClient jClient = new JobClient(conf);
+    JobStatus[] jStatus = jClient.getAllJobs();
+    JobStatus js = jStatus[0]; // Our only job
+    RunningJob rj = jClient.getJob(js.getJobID());
+
+    // All events
+    TaskCompletionEvent[] taskComplEvents = rj.getTaskCompletionEvents(0);
+
+    for (TaskCompletionEvent tce : taskComplEvents) {
+      // Every task HAS to fail
+      assert (tce.getTaskStatus() == TaskCompletionEvent.Status.TIPFAILED || tce
+          .getTaskStatus() == TaskCompletionEvent.Status.FAILED);
+
+      String[] diagnostics = jClient.jobSubmitClient.getTaskDiagnostics(tce
+          .getTaskAttemptId());
+
+      // Every task HAS to spit out the out-of-memory errors
+      assert (diagnostics != null);
+
+      for (String str : diagnostics) {
+        mat = diagMsgPattern.matcher(str);
+        // Every task HAS to spit out the out-of-memory errors in the same
+        // format. And these are the only diagnostic messages.
+        assertTrue(mat.find());
+      }
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java?rev=693359&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java Mon Sep  8 21:38:01 2008
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+import junit.framework.TestCase;
+
+public class TestProcfsBasedProcessTree extends TestCase {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestProcfsBasedProcessTree.class);
+  private ShellCommandExecutor shexec = null;
+  private String pidFile;
+  private String shellScript;
+  private static final int N = 10; // Controls the RogueTask
+
+  private static final int memoryLimit = 15000000; // bytes
+  private static final long PROCESSTREE_RECONSTRUCTION_INTERVAL =
+    ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL; // msec
+
+  private class RogueTaskThread extends Thread {
+    public void run() {
+      try {
+        String args[] = { "bash", "-c",
+            "echo $$ > " + pidFile + "; sh " + shellScript + " " + N + ";" };
+        shexec = new ShellCommandExecutor(args);
+        shexec.execute();
+      } catch (ExitCodeException ee) {
+        LOG.info("Shell Command exit with a non-zero exit code. " + ee);
+      } catch (IOException ioe) {
+        LOG.info("Error executing shell command " + ioe);
+      } finally {
+        LOG.info("Exit code: " + shexec.getExitCode());
+      }
+    }
+  }
+
+  private String getRogueTaskPID() {
+    File f = new File(pidFile);
+    while (!f.exists()) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException ie) {
+        break;
+      }
+    }
+
+    // read from pidFile
+    return ProcfsBasedProcessTree.getPidFromPidFile(pidFile);
+  }
+
+  public void testProcessTree() {
+
+    try {
+      if (!ProcfsBasedProcessTree.isAvailable()) {
+        System.out
+            .println("ProcfsBasedProcessTree is not available on this system. Not testing");
+        return;
+      }
+    } catch (Exception e) {
+      LOG.info(StringUtils.stringifyException(e));
+      return;
+    }
+    // create shell script
+    Random rm = new Random();
+    File tempFile = new File(this.getName() + "_shellScript_" + rm.nextInt()
+        + ".sh");
+    tempFile.deleteOnExit();
+    shellScript = tempFile.getName();
+
+    // create pid file
+    tempFile = new File(this.getName() + "_pidFile_" + rm.nextInt() + ".pid");
+    tempFile.deleteOnExit();
+    pidFile = tempFile.getName();
+
+    // write to shell-script
+    try {
+      FileWriter fWriter = new FileWriter(shellScript);
+      fWriter.write(
+          "# rogue task\n" +
+          "sleep 10\n" +
+          "echo hello\n" +
+          "if [ $1 -ne 0 ]\n" +
+          "then\n" +
+          " sh " + shellScript + " $(($1-1))\n" +
+          "fi");
+      fWriter.close();
+    } catch (IOException ioe) {
+      LOG.info("Error: " + ioe);
+      return;
+    }
+
+    Thread t = new RogueTaskThread();
+    t.start();
+    String pid = getRogueTaskPID();
+    LOG.info("Root process pid: " + pid);
+    ProcfsBasedProcessTree p = new ProcfsBasedProcessTree(pid);
+    p = p.getProcessTree(); // initialize
+    try {
+      while (true) {
+        LOG.info("ProcessTree: " + p.toString());
+        long mem = p.getCumulativeVmem();
+        LOG.info("Memory usage: " + mem + "bytes.");
+        if (mem > memoryLimit) {
+          p.destroy();
+          break;
+        }
+        Thread.sleep(PROCESSTREE_RECONSTRUCTION_INTERVAL);
+        p = p.getProcessTree(); // reconstruct
+      }
+    } catch (InterruptedException ie) {
+      LOG.info("Interrupted.");
+    }
+
+    assertEquals(false, p.isAlive()); // processtree should should be gone
+    // Not able to join thread sometimes when forking with large N.
+    try {
+      t.join(2000);
+      LOG.info("RogueTaskThread successfully joined.");
+    } catch (InterruptedException ie) {
+      LOG.info("Interrupted while joining RogueTaskThread.");
+    }
+  }
+}
\ No newline at end of file