You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/09/09 06:38:02 UTC
svn commit: r693359 - in /hadoop/core/trunk: ./ conf/
src/core/org/apache/hadoop/util/ src/mapred/org/apache/hadoop/mapred/
src/test/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/util/
Author: ddas
Date: Mon Sep 8 21:38:01 2008
New Revision: 693359
URL: http://svn.apache.org/viewvc?rev=693359&view=rev
Log:
HADOOP-3581. Prevents memory intensive user tasks from taking down nodes. Contributed by Vinod K V.
Added:
hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/conf/hadoop-default.xml
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=693359&r1=693358&r2=693359&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Sep 8 21:38:01 2008
@@ -135,6 +135,9 @@
HADOOP-3941. Extend FileSystem API to return file-checksums.
(szetszwo)
+ HADOOP-3581. Prevents memory intensive user tasks from taking down
+ nodes. (Vinod K V via ddas)
+
IMPROVEMENTS
HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.
Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=693359&r1=693358&r2=693359&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Mon Sep 8 21:38:01 2008
@@ -723,6 +723,21 @@
</property>
<property>
+ <name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name>
+ <value>5000</value>
+ <description>The interval, in milliseconds, for which the tasktracker waits
+ between two cycles of monitoring its tasks' memory usage.</description>
+</property>
+
+<property>
+ <name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name>
+ <value>5000</value>
+ <description>The time, in milliseconds, the tasktracker waits for sending a
+ SIGKILL to a process that has overrun memory limits, after it has been sent
+ a SIGTERM.</description>
+</property>
+
+<property>
<name>mapred.map.tasks</name>
<value>2</value>
<description>The default number of map tasks per job. Typically set
Added: hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java?rev=693359&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java Mon Sep 8 21:38:01 2008
@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.Arrays;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * A Proc file-system based ProcfsBasedProcessTree. Works on Linux and Cygwin.
+ */
+public class ProcfsBasedProcessTree {
+
+ private static final Log LOG = LogFactory
+ .getLog("org.apache.hadoop.mapred.ProcfsBasedProcessTree");
+
+ private static final String PROCFS = "/proc/";
+ public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
+ private long sleepTimeBeforeSigKill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
+ private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern
+ .compile("^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+\\s){16}([0-9]+)(\\s[0-9-]+){16}");
+
+ private Integer pid = -1;
+
+ private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
+
+ public ProcfsBasedProcessTree(String pid) {
+ this.pid = getValidPID(pid);
+ }
+
+ public void setSigKillInterval(long interval) {
+ sleepTimeBeforeSigKill = interval;
+ }
+
+ /**
+ * Checks if the ProcfsBasedProcessTree is available on this system.
+ *
+ * @return true if ProcfsBasedProcessTree is available. False otherwise.
+ */
+ public static boolean isAvailable() {
+ try {
+ String osName = System.getProperty("os.name");
+ if (!osName.startsWith("Linux") && !osName.startsWith("Windows")) {
+ LOG
+ .info("ProcfsBasedProcfsBasedProcessTree currently is supported only "
+ + "on Linux and Windows");
+ return false;
+ }
+ } catch (SecurityException se) {
+ LOG.warn("Failed to get Operating System name. " + se);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Get the process-tree with latest state.
+ *
+ * @return the process-tree with latest state.
+ */
+ public ProcfsBasedProcessTree getProcessTree() {
+ if (pid != -1) {
+ // Get the list of processes
+ List<Integer> processList = getProcessList();
+
+ Map<Integer, ProcessInfo> allProcessInfo = new HashMap<Integer, ProcessInfo>();
+ processTree.clear();
+
+ ProcessInfo me = null;
+ for (Integer proc : processList) {
+ // Get information for each process
+ ProcessInfo pInfo = new ProcessInfo(proc);
+ constructProcessInfo(pInfo);
+ allProcessInfo.put(proc, pInfo);
+ if (proc.equals(this.pid)) {
+ me = pInfo; // cache 'me'
+ processTree.put(proc, pInfo);
+ }
+ }
+
+ // Add each process to its parent.
+ for (Map.Entry<Integer, ProcessInfo> entry : allProcessInfo.entrySet()) {
+ Integer pID = entry.getKey();
+ if (pID != 1) {
+ ProcessInfo pInfo = entry.getValue();
+ ProcessInfo parentPInfo = allProcessInfo.get(pInfo.getPpid());
+ if (parentPInfo != null) {
+ parentPInfo.addChild(pInfo);
+ }
+ }
+ }
+
+ // now start constructing the process-tree
+ LinkedList<ProcessInfo> pInfoQueue = new LinkedList<ProcessInfo>();
+ pInfoQueue.addAll(me.getChildren());
+ while (!pInfoQueue.isEmpty()) {
+ ProcessInfo pInfo = pInfoQueue.remove();
+ if (!processTree.containsKey(pInfo.getPid())) {
+ processTree.put(pInfo.getPid(), pInfo);
+ }
+ pInfoQueue.addAll(pInfo.getChildren());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ // Log.debug the ProcfsBasedProcessTree
+ LOG.debug(this.toString());
+ }
+ }
+ return this;
+ }
+
+ /**
+ * Is the process-tree alive? Currently we care only about the status of the
+ * root-process.
+ *
+ * @return true if the process-true is alive, false otherwise.
+ */
+ public boolean isAlive() {
+ if (pid == -1) {
+ return false;
+ } else {
+ return this.isAlive(pid);
+ }
+ }
+
+ /**
+ * Destroy the process-tree. Currently we only make sure the root process is
+ * gone. It is the responsibility of the root process to make sure that all
+ * its descendants are cleaned up.
+ */
+ public void destroy() {
+ LOG.debug("Killing ProcfsBasedProcessTree of " + pid);
+ if (pid == -1) {
+ return;
+ }
+ ShellCommandExecutor shexec = null;
+
+ if (isAlive(this.pid)) {
+ try {
+ String[] args = { "kill", this.pid.toString() };
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (IOException ioe) {
+ LOG.warn("Error executing shell command " + ioe);
+ } finally {
+ LOG.info("Killing " + pid + " with SIGTERM. Exit code "
+ + shexec.getExitCode());
+ }
+ }
+
+ SigKillThread sigKillThread = new SigKillThread();
+ sigKillThread.setDaemon(true);
+ sigKillThread.start();
+ }
+
+ /**
+ * Get the cumulative virtual memory used by all the processes in the
+ * process-tree.
+ *
+ * @return cumulative virtual memory used by the process-tree in bytes.
+ */
+ public long getCumulativeVmem() {
+ long total = 0;
+ for (ProcessInfo p : processTree.values()) {
+ if (p != null) {
+ total += p.getVmem();
+ }
+ }
+ return total;
+ }
+
+ /**
+ * Get PID from a pid-file.
+ *
+ * @param pidFileName
+ * Name of the pid-file.
+ * @return the PID string read from the pid-file. Returns null if the
+ * pidFileName points to a non-existing file or if read fails from the
+ * file.
+ */
+ public static String getPidFromPidFile(String pidFileName) {
+ BufferedReader pidFile = null;
+ FileReader fReader = null;
+ String pid = null;
+
+ try {
+ fReader = new FileReader(pidFileName);
+ pidFile = new BufferedReader(fReader);
+ } catch (FileNotFoundException f) {
+ LOG.debug("PidFile doesn't exist : " + pidFileName);
+ return pid;
+ }
+
+ try {
+ pid = pidFile.readLine();
+ } catch (IOException i) {
+ LOG.error("Failed to read from " + pidFileName);
+ } finally {
+ try {
+ if (fReader != null) {
+ fReader.close();
+ }
+ try {
+ if (pidFile != null) {
+ pidFile.close();
+ }
+ } catch (IOException i) {
+ LOG.warn("Error closing the stream " + pidFile);
+ }
+ } catch (IOException i) {
+ LOG.warn("Error closing the stream " + fReader);
+ }
+ }
+ return pid;
+ }
+
+ private Integer getValidPID(String pid) {
+ Integer retPid = -1;
+ try {
+ retPid = Integer.parseInt((String) pid);
+ if (retPid <= 0) {
+ retPid = -1;
+ }
+ } catch (NumberFormatException nfe) {
+ retPid = -1;
+ }
+ return retPid;
+ }
+
+ /**
+ * Get the list of all processes in the system.
+ */
+ private List<Integer> getProcessList() {
+ String[] processDirs = (new File(PROCFS)).list();
+ List<Integer> processList = new ArrayList<Integer>();
+
+ for (String dir : processDirs) {
+ try {
+ int pd = Integer.parseInt(dir);
+ if ((new File(PROCFS + dir)).isDirectory()) {
+ processList.add(Integer.valueOf(pd));
+ }
+ } catch (NumberFormatException n) {
+ // skip this directory
+ } catch (SecurityException s) {
+ // skip this process
+ }
+ }
+ return processList;
+ }
+
+ /**
+ *
+ * Construct the ProcessInfo using the process' PID and procfs and return the
+ * same. Returns null on failing to read from procfs,
+ */
+ private ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
+ ProcessInfo ret = null;
+ // Read "/proc/<pid>/stat" file
+ BufferedReader in = null;
+ FileReader fReader = null;
+ try {
+ fReader = new FileReader(PROCFS + pinfo.getPid() + "/stat");
+ in = new BufferedReader(fReader);
+ } catch (FileNotFoundException f) {
+ // The process vanished in the interim!
+ return ret;
+ }
+
+ ret = pinfo;
+ try {
+ String str = in.readLine(); // only one line
+ Matcher m = PROCFS_STAT_FILE_FORMAT.matcher(str);
+ boolean mat = m.find();
+ if (mat) {
+ // Set ( name ) ( ppid ) ( pgrpId ) (session ) (vsize )
+ pinfo.update(m.group(2), Integer.parseInt(m.group(3)), Integer
+ .parseInt(m.group(4)), Integer.parseInt(m.group(5)), Long
+ .parseLong(m.group(7)));
+ }
+ } catch (IOException io) {
+ LOG.warn("Error reading the stream " + io);
+ ret = null;
+ } finally {
+ // Close the streams
+ try {
+ if (fReader != null) {
+ fReader.close();
+ }
+ try {
+ if (in != null) {
+ in.close();
+ }
+ } catch (IOException i) {
+ LOG.warn("Error closing the stream " + in);
+ }
+ } catch (IOException i) {
+ LOG.warn("Error closing the stream " + fReader);
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * Is the process with PID pid still alive?
+ */
+ private boolean isAlive(Integer pid) {
+ // This method assumes that isAlive is called on a pid that was alive not
+ // too long ago, and hence assumes no chance of pid-wrapping-around.
+ ShellCommandExecutor shexec = null;
+ try {
+ String[] args = { "kill", "-0", pid.toString() };
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (ExitCodeException ee) {
+ return false;
+ } catch (IOException ioe) {
+ LOG.warn("Error executing shell command "
+ + Arrays.toString(shexec.getExecString()) + ioe);
+ return false;
+ }
+ return (shexec.getExitCode() == 0 ? true : false);
+ }
+
+ /**
+ * Helper thread class that kills process-tree with SIGKILL in background
+ */
+ private class SigKillThread extends Thread {
+
+ public void run() {
+ this.setName(this.getClass().getName() + "-" + String.valueOf(pid));
+ ShellCommandExecutor shexec = null;
+
+ try {
+ // Sleep for some time before sending SIGKILL
+ Thread.sleep(sleepTimeBeforeSigKill);
+ } catch (InterruptedException i) {
+ LOG.warn("Thread sleep is interrupted.");
+ }
+
+ // Kill the root process with SIGKILL if it is still alive
+ if (ProcfsBasedProcessTree.this.isAlive(pid)) {
+ try {
+ String[] args = { "kill", "-9", pid.toString() };
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (IOException ioe) {
+ LOG.warn("Error executing shell command " + ioe);
+ } finally {
+ LOG.info("Killing " + pid + " with SIGKILL. Exit code "
+ + shexec.getExitCode());
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns a string printing PIDs of process present in the
+ * ProcfsBasedProcessTree. Output format : [pid pid ..]
+ */
+ public String toString() {
+ StringBuffer pTree = new StringBuffer("[ ");
+ for (Integer p : processTree.keySet()) {
+ pTree.append(p);
+ pTree.append(" ");
+ }
+ return pTree.substring(0, pTree.length()) + "]";
+ }
+
+ /**
+ *
+ * Class containing information of a process.
+ *
+ */
+ private static class ProcessInfo {
+ private Integer pid; // process-id
+ private String name; // command name
+ private Integer pgrpId; // process group-id
+ private Integer ppid; // parent process-id
+ private Integer sessionId; // session-id
+ private Long vmem; // virtual memory usage
+ private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children
+
+ public ProcessInfo(int pid) {
+ this.pid = Integer.valueOf(pid);
+ }
+
+ public Integer getPid() {
+ return pid;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Integer getPgrpId() {
+ return pgrpId;
+ }
+
+ public Integer getPpid() {
+ return ppid;
+ }
+
+ public Integer getSessionId() {
+ return sessionId;
+ }
+
+ public Long getVmem() {
+ return vmem;
+ }
+
+ public boolean isParent(ProcessInfo p) {
+ if (pid.equals(p.getPpid())) {
+ return true;
+ }
+ return false;
+ }
+
+ public void update(String name, Integer ppid, Integer pgrpId,
+ Integer sessionId, Long vmem) {
+ this.name = name;
+ this.ppid = ppid;
+ this.pgrpId = pgrpId;
+ this.sessionId = sessionId;
+ this.vmem = vmem;
+ }
+
+ public boolean addChild(ProcessInfo p) {
+ return children.add(p);
+ }
+
+ public List<ProcessInfo> getChildren() {
+ return children;
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=693359&r1=693358&r2=693359&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java Mon Sep 8 21:38:01 2008
@@ -216,7 +216,7 @@
long tailLength
) throws IOException {
return captureOutAndError(null, cmd, stdoutFilename,
- stderrFilename, tailLength );
+ stderrFilename, tailLength, null );
}
/**
@@ -237,12 +237,44 @@
File stderrFilename,
long tailLength
) throws IOException {
+ return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
+ tailLength, null);
+ }
+
+ /**
+ * Wrap a command in a shell to capture stdout and stderr to files.
+ * Setup commands such as setting memory limit can be passed which
+ * will be executed before exec.
+ * If the tailLength is 0, the entire output will be saved.
+ * @param setup The setup commands for the execed process.
+ * @param cmd The command and the arguments that should be run
+ * @param stdoutFilename The filename that stdout should be saved to
+ * @param stderrFilename The filename that stderr should be saved to
+ * @param tailLength The length of the tail to be saved.
+ * @param pidFileName The name of the pid-file
+ * @return the modified command that should be run
+ */
+ public static List<String> captureOutAndError(List<String> setup,
+ List<String> cmd,
+ File stdoutFilename,
+ File stderrFilename,
+ long tailLength,
+ String pidFileName
+ ) throws IOException {
String stdout = FileUtil.makeShellPath(stdoutFilename);
String stderr = FileUtil.makeShellPath(stderrFilename);
List<String> result = new ArrayList<String>(3);
result.add(bashCommand);
result.add("-c");
StringBuffer mergedCmd = new StringBuffer();
+
+ // Spit out the pid to pidFileName
+ if (pidFileName != null) {
+ mergedCmd.append("echo $$ > ");
+ mergedCmd.append(pidFileName);
+ mergedCmd.append(" ;");
+ }
+
if (setup != null && setup.size() > 0) {
mergedCmd.append(addCommand(setup, false));
mergedCmd.append(";");
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=693359&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Mon Sep 8 21:38:01 2008
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.util.ProcfsBasedProcessTree;
+
+/**
+ * Manages memory usage of tasks running under this TT. Kills any task-trees
+ * that overflow and over-step memory limits.
+ */
+class TaskMemoryManagerThread extends Thread {
+
+ private static Log LOG = LogFactory.getLog(TaskMemoryManagerThread.class);
+
+ private TaskTracker taskTracker;
+ private long monitoringInterval;
+ private long sleepTimeBeforeSigKill;
+
+ private Map<TaskAttemptID, ProcessTreeInfo> processTreeInfoMap;
+ private Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
+ private List<TaskAttemptID> tasksToBeRemoved;
+
+ public TaskMemoryManagerThread(TaskTracker taskTracker) {
+ this.taskTracker = taskTracker;
+ setName(this.getClass().getName());
+
+ processTreeInfoMap = new HashMap<TaskAttemptID, ProcessTreeInfo>();
+ tasksToBeAdded = new HashMap<TaskAttemptID, ProcessTreeInfo>();
+ tasksToBeRemoved = new ArrayList<TaskAttemptID>();
+
+ monitoringInterval = taskTracker.getJobConf().getLong(
+ "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L);
+ sleepTimeBeforeSigKill = taskTracker.getJobConf().getLong(
+ "mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill",
+ ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+ }
+
+ public void addTask(TaskAttemptID tid, long memLimit) {
+ synchronized (tasksToBeAdded) {
+ LOG.debug("Tracking ProcessTree " + tid + " for the first time");
+ // TODO: Negative values must have been checked in JobConf.
+ memLimit = (memLimit < 0 ? JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT
+ : memLimit);
+ ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit,
+ sleepTimeBeforeSigKill);
+ tasksToBeAdded.put(tid, ptInfo);
+ }
+ }
+
+ public void removeTask(TaskAttemptID tid) {
+ synchronized (tasksToBeRemoved) {
+ tasksToBeRemoved.add(tid);
+ }
+ }
+
+ private static class ProcessTreeInfo {
+ private TaskAttemptID tid;
+ private String pid;
+ private ProcfsBasedProcessTree pTree;
+ private long memLimit;
+
+ public ProcessTreeInfo(TaskAttemptID tid, String pid,
+ ProcfsBasedProcessTree pTree, long memLimit, long sleepTimeBeforeSigKill) {
+ this.tid = tid;
+ this.pid = pid;
+ this.pTree = pTree;
+ if (this.pTree != null) {
+ this.pTree.setSigKillInterval(sleepTimeBeforeSigKill);
+ }
+ this.memLimit = memLimit;
+ }
+
+ public TaskAttemptID getTID() {
+ return tid;
+ }
+
+ public String getPID() {
+ return pid;
+ }
+
+ public void setPid(String pid) {
+ this.pid = pid;
+ }
+
+ public ProcfsBasedProcessTree getProcessTree() {
+ return pTree;
+ }
+
+ public void setProcessTree(ProcfsBasedProcessTree pTree) {
+ this.pTree = pTree;
+ }
+
+ public long getMemLimit() {
+ return memLimit;
+ }
+ }
+
+ @Override
+ public void run() {
+
+ LOG.info("Starting thread: " + this.getClass());
+
+ while (true) {
+ // Print the processTrees for debugging.
+ if (LOG.isDebugEnabled()) {
+ StringBuffer tmp = new StringBuffer("[ ");
+ for (ProcessTreeInfo p : processTreeInfoMap.values()) {
+ tmp.append(p.getPID());
+ tmp.append(" ");
+ }
+ LOG.debug("Current ProcessTree list : "
+ + tmp.substring(0, tmp.length()) + "]");
+ }
+
+ //Add new Tasks
+ synchronized (tasksToBeAdded) {
+ processTreeInfoMap.putAll(tasksToBeAdded);
+ tasksToBeAdded.clear();
+ }
+
+ //Remove finished Tasks
+ synchronized (tasksToBeRemoved) {
+ for (TaskAttemptID tid : tasksToBeRemoved) {
+ processTreeInfoMap.remove(tid);
+ }
+ tasksToBeRemoved.clear();
+ }
+
+ // Now, check memory usage and kill any overflowing tasks
+ for (Iterator<Map.Entry<TaskAttemptID, ProcessTreeInfo>> it = processTreeInfoMap
+ .entrySet().iterator(); it.hasNext();) {
+
+ Map.Entry<TaskAttemptID, ProcessTreeInfo> entry = it.next();
+ TaskAttemptID tid = entry.getKey();
+ ProcessTreeInfo ptInfo = entry.getValue();
+ String pId = ptInfo.getPID();
+
+ // Initialize any uninitialized processTrees
+ if (pId == null) {
+ pId = getPid(tid); // get pid from pid-file
+ if (pId != null) {
+ // PID will be null, either if the pid file is yet to be created
+ // or if the tip is finished and we removed pidFile, but the TIP
+ // itself is still retained in runningTasks till successful
+ // transmission to JT
+
+ // create process tree object
+ ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(pId);
+ LOG.debug("Tracking ProcessTree " + pId + " for the first time");
+
+ ptInfo.setPid(pId);
+ ptInfo.setProcessTree(pt);
+ processTreeInfoMap.put(tid, ptInfo);
+ }
+ }
+ // End of initializing any uninitialized processTrees
+
+ if (pId == null) {
+ continue; // processTree cannot be tracked
+ }
+
+ LOG.debug("Constructing ProcessTree for : PID = " + pId + " TID = "
+ + tid);
+ ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
+ pTree = pTree.getProcessTree(); // get the updated process-tree
+ ptInfo.setProcessTree(pTree); // update ptInfo with proces-tree of
+ // updated state
+ long currentMemUsage = pTree.getCumulativeVmem();
+ long limit = ptInfo.getMemLimit();
+ LOG.info("Memory usage of ProcessTree " + pId + " :" + currentMemUsage
+ + ". Limit : " + limit);
+
+ if (limit != JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT
+ && currentMemUsage > limit) {
+ // Task (the root process) is still alive and overflowing memory.
+ // Clean up.
+ String msg = "TaskTree [pid=" + pId + ",tipID=" + tid
+ + "] is running beyond memory-limits. Current usage : "
+ + currentMemUsage + ". Limit : " + limit + ". Killing task.";
+ LOG.warn(msg);
+ taskTracker.cleanUpOverMemoryTask(tid, msg);
+
+ // Now destroy the ProcessTree, remove it from monitoring map.
+ pTree.destroy();
+ it.remove();
+ LOG.info("Removed ProcessTree with root " + pId);
+ }
+ }
+
+ // Sleep for some time before beginning next cycle
+ try {
+ LOG.debug(this.getClass() + " : Sleeping for " + monitoringInterval
+ + " ms");
+ Thread.sleep(monitoringInterval);
+ } catch (InterruptedException ie) {
+ LOG.warn(this.getClass()
+ + " interrupted. Finishing the thread and returning.");
+ return;
+ }
+ }
+ }
+
+ /**
+ * Load pid of the task from the pidFile.
+ *
+ * @param tipID
+ * @return the pid of the task process.
+ */
+ private String getPid(TaskAttemptID tipID) {
+ Path pidFileName = getPidFilePath(tipID);
+ if (pidFileName == null) {
+ return null;
+ }
+ return ProcfsBasedProcessTree.getPidFromPidFile(pidFileName.toString());
+ }
+
+ private LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+
+ /**
+ * Get the pidFile path of a Task
+ * @param tipID
+ * @return pidFile's Path
+ */
+ Path getPidFilePath(TaskAttemptID tipID) {
+ Path pidFileName = null;
+ try {
+ pidFileName = lDirAlloc.getLocalPathToRead(
+ (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + tipID),
+ taskTracker.getJobConf());
+ } catch (IOException i) {
+ // PID file is not there
+ LOG.debug("Failed to get pidFile name for " + tipID);
+ }
+ return pidFileName;
+ }
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=693359&r1=693358&r2=693359&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Mon Sep 8 21:38:01 2008
@@ -373,6 +373,13 @@
vargs.add(Integer.toString(address.getPort()));
vargs.add(taskid.toString()); // pass task identifier
+ String pidFile = null;
+ if (tracker.isTaskMemoryManagerEnabled()) {
+ pidFile = lDirAlloc.getLocalPathForWrite(
+ (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + taskid),
+ this.conf).toString();
+ }
+
// set memory limit using ulimit if feasible and necessary ...
String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
List<String> setup = null;
@@ -389,7 +396,8 @@
stdout.getParentFile().mkdirs();
tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
List<String> wrappedCommand =
- TaskLog.captureOutAndError(setup, vargs, stdout, stderr, logSize);
+ TaskLog.captureOutAndError(setup, vargs, stdout, stderr, logSize, pidFile);
+ LOG.debug("child jvm command : " + wrappedCommand.toString());
Map<String, String> env = new HashMap<String, String>();
StringBuffer ldLibraryPath = new StringBuffer();
ldLibraryPath.append(workDir.toString());
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=693359&r1=693358&r2=693359&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Mon Sep 8 21:38:01 2008
@@ -75,6 +75,7 @@
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.ProcfsBasedProcessTree;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
@@ -169,6 +170,7 @@
private static final String SUBDIR = "taskTracker";
private static final String CACHEDIR = "archive";
private static final String JOBCACHE = "jobcache";
+ private static final String PIDDIR = "pids";
private JobConf originalConf;
private JobConf fConf;
private int maxCurrentMapTasks;
@@ -178,6 +180,8 @@
private MapEventsFetcherThread mapEventsFetcher;
int workerThreads;
private CleanupQueue directoryCleanupThread;
+ private TaskMemoryManagerThread taskMemoryManager;
+ private boolean taskMemoryManagerEnabled = false;
private long maxVirtualMemoryForTasks
= JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
private long defaultMemoryPerTask = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
@@ -342,6 +346,10 @@
static String getJobCacheSubdir() {
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
}
+
+ static String getPidFilesSubdir() {
+ return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.PIDDIR;
+ }
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException {
@@ -453,6 +461,13 @@
(maxCurrentMapTasks +
maxCurrentReduceTasks);
}
+ // start the taskMemoryManager thread only if enabled
+ setTaskMemoryManagerEnabledFlag();
+ if (isTaskMemoryManagerEnabled()) {
+ taskMemoryManager = new TaskMemoryManagerThread(this);
+ taskMemoryManager.setDaemon(true);
+ taskMemoryManager.start();
+ }
this.running = true;
}
@@ -1447,6 +1462,9 @@
}
try {
localizeJob(tip);
+ if (isTaskMemoryManagerEnabled()) {
+ taskMemoryManager.addTask(t.getTaskID(), getMemoryForTask(tip));
+ }
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() +
":\n" + StringUtils.stringifyException(e));
@@ -2036,6 +2054,15 @@
void cleanup(boolean needCleanup) throws IOException {
TaskAttemptID taskId = task.getTaskID();
LOG.debug("Cleaning up " + taskId);
+
+ // Remove the associated pid-file, if any
+ if (TaskTracker.this.isTaskMemoryManagerEnabled()) {
+ Path pidFilePath = taskMemoryManager.getPidFilePath(taskId);
+ if (pidFilePath != null) {
+ directoryCleanupThread.addToQueue(pidFilePath);
+ }
+ }
+
synchronized (TaskTracker.this) {
if (needCleanup) {
tasks.remove(taskId);
@@ -2240,6 +2267,10 @@
} else {
LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
}
+ // Remove the entry from taskMemoryManagerThread's data structures.
+ if (isTaskMemoryManagerEnabled()) {
+ taskMemoryManager.removeTask(taskid);
+ }
}
/**
@@ -2702,4 +2733,48 @@
return maxCurrentReduceTasks;
}
+ /**
+ * Is the TaskMemoryManager Enabled on this system?
+ * @return true if enabled, false otherwise.
+ */
+ boolean isTaskMemoryManagerEnabled() {
+ return taskMemoryManagerEnabled;
+ }
+
+ private void setTaskMemoryManagerEnabledFlag() {
+ if (!ProcfsBasedProcessTree.isAvailable()) {
+ LOG.info("ProcessTree implementation is missing on this system. "
+ + "TaskMemoryManager is disabled.");
+ taskMemoryManagerEnabled = false;
+ return;
+ }
+
+ Long tasksMaxMem = getMaxVirtualMemoryForTasks();
+ if (tasksMaxMem == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) {
+ LOG.info("TaskTracker's tasksMaxMem is not set. TaskMemoryManager is "
+ + "disabled.");
+ taskMemoryManagerEnabled = false;
+ return;
+ }
+
+ taskMemoryManagerEnabled = true;
+ }
+
+ /**
+ * Clean-up the task that TaskMemoryMangerThread requests to do so.
+ * @param tid
+ * @param diagnosticMsg
+ */
+ synchronized void cleanUpOverMemoryTask(TaskAttemptID tid,
+ String diagnosticMsg) {
+ TaskInProgress tip = runningTasks.get(tid);
+ if (tip != null) {
+ tip.reportDiagnosticInfo(diagnosticMsg);
+ try {
+ purgeTask(tip, true); // Marking it as failure.
+ } catch (IOException ioe) {
+ LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe);
+ }
+ }
+ }
}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=693359&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Mon Sep 8 21:38:01 2008
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.examples.WordCount;
+import org.apache.hadoop.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.TestCase;
+
+public class TestTaskTrackerMemoryManager extends TestCase {
+
+ private static final Log LOG = LogFactory.getLog(TestTaskTrackerMemoryManager.class);
+ private MiniDFSCluster miniDFSCluster;
+ private MiniMRCluster miniMRCluster;
+
+ private void startCluster(JobConf conf) throws Exception {
+ miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
+ FileSystem fileSys = miniDFSCluster.getFileSystem();
+ String namenode = fileSys.getUri().toString();
+ miniMRCluster = new MiniMRCluster(1, namenode, 1, null, null, conf);
+ }
+
+ @Override
+ protected void tearDown() {
+ if (miniMRCluster != null) {
+ miniMRCluster.shutdown();
+ }
+ if (miniDFSCluster != null) {
+ miniDFSCluster.shutdown();
+ }
+ }
+
+ private void runWordCount(JobConf conf) throws Exception {
+ Path input = new Path("input.txt");
+ Path output = new Path("output");
+
+ OutputStream os = miniDFSCluster.getFileSystem().create(input);
+ Writer wr = new OutputStreamWriter(os);
+ wr.write("hello1\n");
+ wr.write("hello2\n");
+ wr.write("hello3\n");
+ wr.write("hello4\n");
+ wr.close();
+
+ Tool WordCount = new WordCount();
+ if (conf != null) {
+ WordCount.setConf(conf);
+ }
+ ToolRunner.run(WordCount, new String[] { input.toString(),
+ output.toString() });
+ }
+
+ public void testNormalTaskAndLimitedTT() throws Exception {
+ // Run the test only if memory management is enabled
+
+ try {
+ if (!ProcfsBasedProcessTree.isAvailable()) {
+ LOG.info("Currently ProcessTree has only one implementation "
+ + "ProcfsBasedProcessTree, which is not available on this "
+ + "system. Not testing");
+ return;
+ }
+ } catch (Exception e) {
+ LOG.info(StringUtils.stringifyException(e));
+ return;
+ }
+
+ Pattern diagMsgPattern = Pattern
+ .compile("TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond "
+ + "memory-limits. Current usage : [0-9]*. Limit : [0-9]*. Killing task.");
+ Matcher mat = null;
+
+ // Start cluster with proper configuration.
+ JobConf fConf = new JobConf();
+
+ fConf.setMaxVirtualMemoryForTasks(Long.valueOf(10000000000L)); // Fairly large value for WordCount to succeed
+ startCluster(fConf);
+
+ // Set up job.
+ JobConf conf = new JobConf();
+ JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
+ conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
+ + jt.getTrackerPort());
+ NameNode nn = miniDFSCluster.getNameNode();
+ conf.set("fs.default.name", "hdfs://"
+ + nn.getNameNodeAddress().getHostName() + ":"
+ + nn.getNameNodeAddress().getPort());
+
+ // Start the job.
+ boolean success = true;
+ try {
+ runWordCount(conf);
+ success = true;
+ } catch (Exception e) {
+ success = false;
+ }
+
+ // Job has to succeed
+ assertTrue(success);
+
+ // Alas, we don't have a way to get job id/Task completion events from
+ // WordCount
+ JobClient jClient = new JobClient(conf);
+ JobStatus[] jStatus = jClient.getAllJobs();
+ JobStatus js = jStatus[0]; // Our only job
+ RunningJob rj = jClient.getJob(js.getJobID());
+
+ // All events
+ TaskCompletionEvent[] taskComplEvents = rj.getTaskCompletionEvents(0);
+
+ for (TaskCompletionEvent tce : taskComplEvents) {
+ String[] diagnostics = jClient.jobSubmitClient.getTaskDiagnostics(tce
+ .getTaskAttemptId());
+
+ if (diagnostics != null) {
+ for (String str : diagnostics) {
+ mat = diagMsgPattern.matcher(str);
+ // The error pattern shouldn't be there in any TIP's diagnostics
+ assertFalse(mat.find());
+ }
+ }
+ }
+ }
+
+ public void testOOMTaskAndLimitedTT() throws Exception {
+
+ // Run the test only if memory management is enabled
+
+ try {
+ if (!ProcfsBasedProcessTree.isAvailable()) {
+ LOG.info("Currently ProcessTree has only one implementation "
+ + "ProcfsBasedProcessTree, which is not available on this "
+ + "system. Not testing");
+ return;
+ }
+ } catch (Exception e) {
+ LOG.info(StringUtils.stringifyException(e));
+ return;
+ }
+
+ long PER_TASK_LIMIT = 444; // Enough to kill off WordCount.
+ Pattern diagMsgPattern = Pattern
+ .compile("TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond "
+ + "memory-limits. Current usage : [0-9]*. Limit : "
+ + PER_TASK_LIMIT + ". Killing task.");
+ Matcher mat = null;
+
+ // Start cluster with proper configuration.
+ JobConf fConf = new JobConf();
+ fConf.setMaxVirtualMemoryForTasks(Long.valueOf(100000));
+ fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300));
+ //very small value, so that no task escapes to successful completion.
+ startCluster(fConf);
+
+ // Set up job.
+ JobConf conf = new JobConf();
+ conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
+ JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
+ conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
+ + jt.getTrackerPort());
+ NameNode nn = miniDFSCluster.getNameNode();
+ conf.set("fs.default.name", "hdfs://"
+ + nn.getNameNodeAddress().getHostName() + ":"
+ + nn.getNameNodeAddress().getPort());
+
+ // Start the job.
+ boolean success = true;
+ try {
+ runWordCount(conf);
+ success = true;
+ } catch (Exception e) {
+ success = false;
+ }
+
+ // Job has to fail
+ assertFalse(success);
+
+ // Alas, we don't have a way to get job id/Task completion events from
+ // WordCount
+ JobClient jClient = new JobClient(conf);
+ JobStatus[] jStatus = jClient.getAllJobs();
+ JobStatus js = jStatus[0]; // Our only job
+ RunningJob rj = jClient.getJob(js.getJobID());
+
+ // All events
+ TaskCompletionEvent[] taskComplEvents = rj.getTaskCompletionEvents(0);
+
+ for (TaskCompletionEvent tce : taskComplEvents) {
+ // Every task HAS to fail
+ assert (tce.getTaskStatus() == TaskCompletionEvent.Status.TIPFAILED || tce
+ .getTaskStatus() == TaskCompletionEvent.Status.FAILED);
+
+ String[] diagnostics = jClient.jobSubmitClient.getTaskDiagnostics(tce
+ .getTaskAttemptId());
+
+ // Every task HAS to spit out the out-of-memory errors
+ assert (diagnostics != null);
+
+ for (String str : diagnostics) {
+ mat = diagMsgPattern.matcher(str);
+ // Every task HAS to spit out the out-of-memory errors in the same
+ // format. And these are the only diagnostic messages.
+ assertTrue(mat.find());
+ }
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java?rev=693359&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java Mon Sep 8 21:38:01 2008
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+import junit.framework.TestCase;
+
+public class TestProcfsBasedProcessTree extends TestCase {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestProcfsBasedProcessTree.class);
+ private ShellCommandExecutor shexec = null;
+ private String pidFile;
+ private String shellScript;
+ private static final int N = 10; // Controls the RogueTask
+
+ private static final int memoryLimit = 15000000; // bytes
+ private static final long PROCESSTREE_RECONSTRUCTION_INTERVAL =
+ ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL; // msec
+
+ private class RogueTaskThread extends Thread {
+ public void run() {
+ try {
+ String args[] = { "bash", "-c",
+ "echo $$ > " + pidFile + "; sh " + shellScript + " " + N + ";" };
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (ExitCodeException ee) {
+ LOG.info("Shell Command exit with a non-zero exit code. " + ee);
+ } catch (IOException ioe) {
+ LOG.info("Error executing shell command " + ioe);
+ } finally {
+ LOG.info("Exit code: " + shexec.getExitCode());
+ }
+ }
+ }
+
+ private String getRogueTaskPID() {
+ File f = new File(pidFile);
+ while (!f.exists()) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ie) {
+ break;
+ }
+ }
+
+ // read from pidFile
+ return ProcfsBasedProcessTree.getPidFromPidFile(pidFile);
+ }
+
+ public void testProcessTree() {
+
+ try {
+ if (!ProcfsBasedProcessTree.isAvailable()) {
+ System.out
+ .println("ProcfsBasedProcessTree is not available on this system. Not testing");
+ return;
+ }
+ } catch (Exception e) {
+ LOG.info(StringUtils.stringifyException(e));
+ return;
+ }
+ // create shell script
+ Random rm = new Random();
+ File tempFile = new File(this.getName() + "_shellScript_" + rm.nextInt()
+ + ".sh");
+ tempFile.deleteOnExit();
+ shellScript = tempFile.getName();
+
+ // create pid file
+ tempFile = new File(this.getName() + "_pidFile_" + rm.nextInt() + ".pid");
+ tempFile.deleteOnExit();
+ pidFile = tempFile.getName();
+
+ // write to shell-script
+ try {
+ FileWriter fWriter = new FileWriter(shellScript);
+ fWriter.write(
+ "# rogue task\n" +
+ "sleep 10\n" +
+ "echo hello\n" +
+ "if [ $1 -ne 0 ]\n" +
+ "then\n" +
+ " sh " + shellScript + " $(($1-1))\n" +
+ "fi");
+ fWriter.close();
+ } catch (IOException ioe) {
+ LOG.info("Error: " + ioe);
+ return;
+ }
+
+ Thread t = new RogueTaskThread();
+ t.start();
+ String pid = getRogueTaskPID();
+ LOG.info("Root process pid: " + pid);
+ ProcfsBasedProcessTree p = new ProcfsBasedProcessTree(pid);
+ p = p.getProcessTree(); // initialize
+ try {
+ while (true) {
+ LOG.info("ProcessTree: " + p.toString());
+ long mem = p.getCumulativeVmem();
+ LOG.info("Memory usage: " + mem + "bytes.");
+ if (mem > memoryLimit) {
+ p.destroy();
+ break;
+ }
+ Thread.sleep(PROCESSTREE_RECONSTRUCTION_INTERVAL);
+ p = p.getProcessTree(); // reconstruct
+ }
+ } catch (InterruptedException ie) {
+ LOG.info("Interrupted.");
+ }
+
+ assertEquals(false, p.isAlive()); // processtree should should be gone
+ // Not able to join thread sometimes when forking with large N.
+ try {
+ t.join(2000);
+ LOG.info("RogueTaskThread successfully joined.");
+ } catch (InterruptedException ie) {
+ LOG.info("Interrupted while joining RogueTaskThread.");
+ }
+ }
+}
\ No newline at end of file