You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:27:25 UTC
svn commit: r1076975 [1/2] - in
/hadoop/common/branches/branch-0.20-security-patches/src:
core/org/apache/hadoop/util/ mapred/ mapred/org/apache/hadoop/mapred/
mapred/org/apache/hadoop/mapred/pipes/ test/org/apache/hadoop/mapred/
test/org/apache/hadoop...
Author: omalley
Date: Fri Mar 4 03:27:24 2011
New Revision: 1076975
URL: http://svn.apache.org/viewvc?rev=1076975&view=rev
Log:
commit fe1e8a78747da4e18708387d9efdbe7146e44e7b
Author: Yahoo\! <lt...@yahoo-inc.com>
Date: Tue Aug 18 09:12:41 2009 -0700
Apply patch for HADOOP-5488 from http://issues.apache.org/jira/secure/attachment/12414066/hadoop-5488-ydist.patch
Added:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmContext.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java?rev=1076975&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java Fri Mar 4 03:27:24 2011
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * Process tree related operations
+ */
+public class ProcessTree {
+
+ private static final Log LOG = LogFactory.getLog(ProcessTree.class);
+
+ public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
+
+ public static final boolean isSetsidAvailable = isSetsidSupported();
+ private static boolean isSetsidSupported() {
+ ShellCommandExecutor shexec = null;
+ boolean setsidSupported = true;
+ try {
+ String[] args = {"setsid", "bash", "-c", "echo $$"};
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (IOException ioe) {
+ LOG.warn("setsid is not available on this machine. So not using it.");
+ setsidSupported = false;
+ } finally { // handle the exit code
+ LOG.info("setsid exited with exit code " + shexec.getExitCode());
+ return setsidSupported;
+ }
+ }
+
+ /**
+ * Kills the process(OR process group) by sending the signal SIGKILL
+ * in the current thread
+ * @param pid Process id(OR process group id) of to-be-deleted-process
+ * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
+ * @param sleepTimeBeforeSigKill wait time before sending SIGKILL after
+ * sending SIGTERM
+ */
+ private static void sigKillInCurrentThread(String pid, boolean isProcessGroup,
+ long sleepTimeBeforeSigKill) {
+ // Kill the subprocesses of root process(even if the root process is not
+ // alive) if process group is to be killed.
+ if (isProcessGroup || ProcessTree.isAlive(pid)) {
+ try {
+ // Sleep for some time before sending SIGKILL
+ Thread.sleep(sleepTimeBeforeSigKill);
+ } catch (InterruptedException i) {
+ LOG.warn("Thread sleep is interrupted.");
+ }
+
+ ShellCommandExecutor shexec = null;
+
+ try {
+ String pid_pgrpid;
+ if(isProcessGroup) {//kill the whole process group
+ pid_pgrpid = "-" + pid;
+ }
+ else {//kill single process
+ pid_pgrpid = pid;
+ }
+
+ String[] args = { "kill", "-9", pid_pgrpid };
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (IOException ioe) {
+ LOG.warn("Error executing shell command " + ioe);
+ } finally {
+ if(isProcessGroup) {
+ LOG.info("Killing process group" + pid + " with SIGKILL. Exit code "
+ + shexec.getExitCode());
+ }
+ else {
+ LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
+ + shexec.getExitCode());
+ }
+ }
+ }
+ }
+
+ /** Kills the process(OR process group) by sending the signal SIGKILL
+ * @param pid Process id(OR process group id) of to-be-deleted-process
+ * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
+ * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+ * after sending SIGTERM
+ * @param inBackground Process is to be killed in the back ground with
+ * a separate thread
+ */
+ private static void sigKill(String pid, boolean isProcessGroup,
+ long sleeptimeBeforeSigkill, boolean inBackground) {
+
+ if(inBackground) { // use a separate thread for killing
+ SigKillThread sigKillThread = new SigKillThread(pid, isProcessGroup,
+ sleeptimeBeforeSigkill);
+ sigKillThread.setDaemon(true);
+ sigKillThread.start();
+ }
+ else {
+ sigKillInCurrentThread(pid, isProcessGroup, sleeptimeBeforeSigkill);
+ }
+ }
+
+ /** Destroy the process.
+ * @param pid Process id of to-be-killed-process
+ * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+ * after sending SIGTERM
+ * @param inBackground Process is to be killed in the back ground with
+ * a separate thread
+ */
+ protected static void destroyProcess(String pid, long sleeptimeBeforeSigkill,
+ boolean inBackground) {
+ ShellCommandExecutor shexec = null;
+ try {
+ String[] args = { "kill", pid };
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (IOException ioe) {
+ LOG.warn("Error executing shell command " + ioe);
+ } finally {
+ LOG.info("Killing process " + pid +
+ " with SIGTERM. Exit code " + shexec.getExitCode());
+ }
+
+ sigKill(pid, false, sleeptimeBeforeSigkill, inBackground);
+ }
+
+ /** Destroy the process group.
+ * @param pgrpId Process group id of to-be-killed-processes
+ * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+ * after sending SIGTERM
+ * @param inBackground Process group is to be killed in the back ground with
+ * a separate thread
+ */
+ protected static void destroyProcessGroup(String pgrpId,
+ long sleeptimeBeforeSigkill, boolean inBackground) {
+ ShellCommandExecutor shexec = null;
+ try {
+ String[] args = { "kill", "--", "-" + pgrpId };
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (IOException ioe) {
+ LOG.warn("Error executing shell command " + ioe);
+ } finally {
+ LOG.info("Killing all processes in the process group " + pgrpId +
+ " with SIGTERM. Exit code " + shexec.getExitCode());
+ }
+
+ sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
+ }
+
+ /**
+ * Destroy the process-tree.
+ * @param pid process id of the root process of the subtree of processes
+ * to be killed
+ * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+ * after sending SIGTERM
+ * @param isProcessGroup pid is a process group leader or not
+ * @param inBackground Process is to be killed in the back ground with
+ * a separate thread
+ */
+ public static void destroy(String pid, long sleeptimeBeforeSigkill,
+ boolean isProcessGroup, boolean inBackground) {
+ if(isProcessGroup) {
+ destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground);
+ }
+ else {
+ //TODO: Destroy all the processes in the subtree in this case also.
+ // For the time being, killing only the root process.
+ destroyProcess(pid, sleeptimeBeforeSigkill, inBackground);
+ }
+ }
+
+
+ /**
+ * Is the process with PID pid still alive?
+ * This method assumes that isAlive is called on a pid that was alive not
+ * too long ago, and hence assumes no chance of pid-wrapping-around.
+ */
+ public static boolean isAlive(String pid) {
+ ShellCommandExecutor shexec = null;
+ try {
+ String[] args = { "kill", "-0", pid };
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (ExitCodeException ee) {
+ return false;
+ } catch (IOException ioe) {
+ LOG.warn("Error executing shell command "
+ + Arrays.toString(shexec.getExecString()) + ioe);
+ return false;
+ }
+ return (shexec.getExitCode() == 0 ? true : false);
+ }
+
+ /**
+ * Helper thread class that kills process-tree with SIGKILL in background
+ */
+ static class SigKillThread extends Thread {
+ private String pid = null;
+ private boolean isProcessGroup = false;
+
+ private long sleepTimeBeforeSigKill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
+
+ private SigKillThread(String pid, boolean isProcessGroup, long interval) {
+ this.pid = pid;
+ this.isProcessGroup = isProcessGroup;
+ this.setName(this.getClass().getName() + "-" + pid);
+ sleepTimeBeforeSigKill = interval;
+ }
+
+ public void run() {
+ sigKillInCurrentThread(pid, isProcessGroup, sleepTimeBeforeSigKill);
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java Fri Mar 4 03:27:24 2011
@@ -41,10 +41,10 @@ import org.apache.hadoop.util.Shell.Shel
/**
* A Proc file-system based ProcessTree. Works only on Linux.
*/
-public class ProcfsBasedProcessTree {
+public class ProcfsBasedProcessTree extends ProcessTree {
private static final Log LOG = LogFactory
- .getLog("org.apache.hadoop.mapred.ProcfsBasedProcessTree");
+ .getLog(ProcfsBasedProcessTree.class);
private static final String PROCFS = "/proc/";
public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
@@ -58,10 +58,21 @@ public class ProcfsBasedProcessTree {
private Integer pid = -1;
+ private boolean setsidUsed = false;
+
+ private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
+
private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
public ProcfsBasedProcessTree(String pid) {
- this(pid, PROCFS);
+ this(pid, false, DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+ }
+
+ public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
+ long sigkillInterval) {
+ this(pid,PROCFS);
+ this.setsidUsed = setsidUsed;
+ sleeptimeBeforeSigkill = sigkillInterval;
}
public ProcfsBasedProcessTree(String pid, String procfsDir) {
@@ -69,6 +80,13 @@ public class ProcfsBasedProcessTree {
this.procfsDir = procfsDir;
}
+ /**
+ * Sets SIGKILL interval
+ * @deprecated Use {@link ProcfsBasedProcessTree#ProcfsBasedProcessTree(
+ * String, boolean, long)} instead
+ * @param interval The time to wait before sending SIGKILL
+ * after sending SIGTERM
+ */
public void setSigKillInterval(long interval) {
sleepTimeBeforeSigKill = interval;
}
@@ -170,47 +188,107 @@ public class ProcfsBasedProcessTree {
}
/**
- * Is the process-tree alive? Currently we care only about the status of the
- * root-process.
+ * Is the root-process alive?
*
- * @return true if the process-true is alive, false otherwise.
+ * @return true if the root-process is alive, false otherwise.
*/
public boolean isAlive() {
if (pid == -1) {
return false;
} else {
- return this.isAlive(pid);
+ return isAlive(pid.toString());
+ }
+ }
+
+ /**
+ * Is any of the subprocesses in the process-tree alive?
+ *
+ * @return true if any of the processes in the process-tree is
+ * alive, false otherwise.
+ */
+ public boolean isAnyProcessInTreeAlive() {
+ for (Integer pId : processTree.keySet()) {
+ if (isAlive(pId.toString())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** Verify that the given process id is same as its process group id.
+ * @param pidStr Process id of the to-be-verified-process
+ */
+ private static boolean assertPidPgrpidForMatch(String pidStr) {
+ Integer pId = Integer.parseInt(pidStr);
+ // Get information for this process
+ ProcessInfo pInfo = new ProcessInfo(pId);
+ pInfo = constructProcessInfo(pInfo);
+ //make sure that pId and its pgrpId match
+ if (!pInfo.getPgrpId().equals(pId)) {
+ LOG.warn("Unexpected: Process with PID " + pId +
+ " is not a process group leader.");
+ return false;
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(pId + " is a process group leader, as expected.");
+ }
+ return true;
+ }
+
+ /** Make sure that the given pid is a process group leader and then
+ * destroy the process group.
+ * @param pgrpId Process group id of to-be-killed-processes
+ * @param interval The time to wait before sending SIGKILL
+ * after sending SIGTERM
+ * @param inBackground Process is to be killed in the back ground with
+ * a separate thread
+ */
+ public static void assertAndDestroyProcessGroup(String pgrpId, long interval,
+ boolean inBackground)
+ throws IOException {
+ // Make sure that the pid given is a process group leader
+ if (!assertPidPgrpidForMatch(pgrpId)) {
+ throw new IOException("Process with PID " + pgrpId +
+ " is not a process group leader.");
+ }
+ destroyProcessGroup(pgrpId, interval, inBackground);
}
/**
- * Destroy the process-tree. Currently we only make sure the root process is
- * gone. It is the responsibility of the root process to make sure that all
- * its descendants are cleaned up.
+ * Destroy the process-tree.
*/
public void destroy() {
+ destroy(true);
+ }
+
+ /**
+ * Destroy the process-tree.
+ * @param inBackground Process is to be killed in the back ground with
+ * a separate thread
+ */
+ public void destroy(boolean inBackground) {
LOG.debug("Killing ProcfsBasedProcessTree of " + pid);
if (pid == -1) {
return;
}
- ShellCommandExecutor shexec = null;
- if (isAlive(this.pid)) {
- try {
- String[] args = { "kill", this.pid.toString() };
- shexec = new ShellCommandExecutor(args);
- shexec.execute();
- } catch (IOException ioe) {
- LOG.warn("Error executing shell command " + ioe);
- } finally {
- LOG.info("Killing " + pid + " with SIGTERM. Exit code "
- + shexec.getExitCode());
+ if (isAlive(pid.toString())) {
+ if (isSetsidAvailable && setsidUsed) {
+ // In this case, we know that pid got created using setsid. So kill the
+ // whole processGroup.
+ try {
+ assertAndDestroyProcessGroup(pid.toString(), sleeptimeBeforeSigkill,
+ inBackground);
+ } catch (IOException e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ }
+ }
+ else {
+ //TODO: Destroy all the processes in the subtree in this case also.
+ // For the time being, killing only the root process.
+ destroyProcess(pid.toString(), sleeptimeBeforeSigkill, inBackground);
}
}
-
- SigKillThread sigKillThread = new SigKillThread();
- sigKillThread.setDaemon(true);
- sigKillThread.start();
}
/**
@@ -243,52 +321,7 @@ public class ProcfsBasedProcessTree {
return total;
}
- /**
- * Get PID from a pid-file.
- *
- * @param pidFileName
- * Name of the pid-file.
- * @return the PID string read from the pid-file. Returns null if the
- * pidFileName points to a non-existing file or if read fails from the
- * file.
- */
- public static String getPidFromPidFile(String pidFileName) {
- BufferedReader pidFile = null;
- FileReader fReader = null;
- String pid = null;
-
- try {
- fReader = new FileReader(pidFileName);
- pidFile = new BufferedReader(fReader);
- } catch (FileNotFoundException f) {
- LOG.debug("PidFile doesn't exist : " + pidFileName);
- return pid;
- }
-
- try {
- pid = pidFile.readLine();
- } catch (IOException i) {
- LOG.error("Failed to read from " + pidFileName);
- } finally {
- try {
- if (fReader != null) {
- fReader.close();
- }
- try {
- if (pidFile != null) {
- pidFile.close();
- }
- } catch (IOException i) {
- LOG.warn("Error closing the stream " + pidFile);
- }
- } catch (IOException i) {
- LOG.warn("Error closing the stream " + fReader);
- }
- }
- return pid;
- }
-
- private Integer getValidPID(String pid) {
+ private static Integer getValidPID(String pid) {
Integer retPid = -1;
try {
retPid = Integer.parseInt((String) pid);
@@ -328,7 +361,7 @@ public class ProcfsBasedProcessTree {
* Construct the ProcessInfo using the process' PID and procfs and return the
* same. Returns null on failing to read from procfs,
*/
- private ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
+ private static ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
return constructProcessInfo(pinfo, PROCFS);
}
@@ -343,7 +376,7 @@ public class ProcfsBasedProcessTree {
* @param procfsDir root of the proc file system
* @return updated ProcessInfo, null on errors.
*/
- private ProcessInfo constructProcessInfo(ProcessInfo pinfo,
+ private static ProcessInfo constructProcessInfo(ProcessInfo pinfo,
String procfsDir) {
ProcessInfo ret = null;
// Read "procfsDir/<pid>/stat" file
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar 4 03:27:24 2011
@@ -198,12 +198,10 @@
</property>
<property>
- <name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name>
+ <name>mapred.tasktracker.tasks.sleeptime-before-sigkill</name>
<value>5000</value>
<description>The time, in milliseconds, the tasktracker waits for sending a
- SIGKILL to a process that has overrun memory limits, after it has been sent
- a SIGTERM. Used only if tasks' memory management is enabled via
- mapred.tasktracker.tasks.maxmemory.</description>
+ SIGKILL to a process, after it has been sent a SIGTERM.</description>
</property>
<property>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar 4 03:27:24 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.metrics.Metrics
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
import org.apache.log4j.LogManager;
+import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
/**
@@ -45,7 +46,7 @@ import org.apache.hadoop.util.StringUtil
class Child {
public static final Log LOG =
- LogFactory.getLog(TaskTracker.class);
+ LogFactory.getLog(Child.class);
static volatile TaskAttemptID taskid = null;
static volatile boolean isCleanup;
@@ -99,18 +100,18 @@ class Child {
t.setName("Thread for syncLogs");
t.setDaemon(true);
t.start();
- //for the memory management, a PID file is written and the PID file
- //is written once per JVM. We simply symlink the file on a per task
- //basis later (see below). Long term, we should change the Memory
- //manager to use JVMId instead of TaskAttemptId
- Path srcPidPath = null;
- Path dstPidPath = null;
+
+ String pid = "";
+ if (!Shell.WINDOWS) {
+ pid = System.getenv().get("JVM_PID");
+ }
+ JvmContext context = new JvmContext(jvmId, pid);
int idleLoopCount = 0;
Task task = null;
try {
while (true) {
taskid = null;
- JvmTask myTask = umbilical.getTask(jvmId);
+ JvmTask myTask = umbilical.getTask(context);
if (myTask.shouldDie()) {
break;
} else {
@@ -137,18 +138,6 @@ class Child {
//are viewable immediately
TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
JobConf job = new JobConf(task.getJobFile());
- if (job.getBoolean("task.memory.mgmt.enabled", false)) {
- if (srcPidPath == null) {
- srcPidPath = new Path(task.getPidFile());
- }
- //since the JVM is running multiple tasks potentially, we need
- //to do symlink stuff only for the subsequent tasks
- if (!taskid.equals(firstTaskid)) {
- dstPidPath = new Path(task.getPidFile());
- FileUtil.symLink(srcPidPath.toUri().getPath(),
- dstPidPath.toUri().getPath());
- }
- }
//setupWorkDir actually sets up the symlinks for the distributed
//cache. After a task exits we wipe the workdir clean, and hence
//the symlinks have to be rebuilt.
@@ -170,11 +159,6 @@ class Child {
task.run(job, umbilical); // run the task
} finally {
TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
- if (!taskid.equals(firstTaskid) &&
- job.getBoolean("task.memory.mgmt.enabled", false)) {
- // delete the pid-file's symlink
- new File(dstPidPath.toUri().getPath()).delete();
- }
}
if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
break;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Mar 4 03:27:24 2011
@@ -22,7 +22,9 @@ import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -51,7 +53,7 @@ class DefaultTaskController extends Task
JvmEnv env = context.env;
List<String> wrappedCommand =
TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
- env.logSize, env.pidFile);
+ env.logSize, true);
ShellCommandExecutor shexec =
new ShellCommandExecutor(wrappedCommand.toArray(new String[0]),
env.workDir, env.env);
@@ -68,13 +70,34 @@ class DefaultTaskController extends Task
*/
void killTaskJVM(TaskController.TaskControllerContext context) {
ShellCommandExecutor shexec = context.shExec;
+
if (shexec != null) {
Process process = shexec.getProcess();
- if (process != null) {
- process.destroy();
+ if (Shell.WINDOWS) {
+ // Currently we don't use setsid on WINDOWS. So kill the process alone.
+ if (process != null) {
+ process.destroy();
+ }
+ }
+ else { // In addition to the task JVM, kill its subprocesses also.
+ String pid = context.pid;
+ if (pid != null) {
+ ProcessTree.destroy(pid, context.sleeptimeBeforeSigkill,
+ ProcessTree.isSetsidAvailable, false);
+ try {
+ if (process != null) {
+ LOG.info("Process exited with exit code:" + process.waitFor());
+ }
+ } catch (InterruptedException ie) {}
+ }
+ else if (process != null) {
+ // kill the task JVM alone, if we don't have the process group id
+ process.destroy();
+ }
}
}
}
+
/**
* Initialize the task environment.
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Fri Mar 4 03:27:24 2011
@@ -63,7 +63,7 @@ public class IsolationRunner {
LOG.info("Task " + taskId + " reporting fatal error: " + msg);
}
- public JvmTask getTask(JVMId jvmId) throws IOException {
+ public JvmTask getTask(JvmContext context) throws IOException {
return null;
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmContext.java?rev=1076975&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmContext.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmContext.java Fri Mar 4 03:27:24 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+class JvmContext implements Writable {
+
+ public static final Log LOG =
+ LogFactory.getLog(JvmContext.class);
+
+ JVMId jvmId;
+ String pid;
+
+ JvmContext() {
+ jvmId = new JVMId();
+ pid = "";
+ }
+
+ JvmContext(JVMId id, String pid) {
+ jvmId = id;
+ this.pid = pid;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ jvmId.readFields(in);
+ this.pid = Text.readString(in);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ jvmId.write(out);
+ Text.writeString(out, pid);
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Mar 4 03:27:24 2011
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
class JvmManager {
@@ -46,8 +47,8 @@ class JvmManager {
public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
File stdout,File stderr,long logSize, File workDir,
- Map<String,String> env, String pidFile, JobConf conf) {
- return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,pidFile,conf);
+ Map<String,String> env, JobConf conf) {
+ return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,conf);
}
public JvmManager(TaskTracker tracker) {
@@ -57,6 +58,39 @@ class JvmManager {
false, tracker);
}
+ /*
+ * Saves pid of the given taskJvm
+ */
+ void setPidToJvm(JVMId jvmId, String pid) {
+ if (jvmId.isMapJVM()) {
+ mapJvmManager.jvmIdToPid.put(jvmId, pid);
+ }
+ else {
+ reduceJvmManager.jvmIdToPid.put(jvmId, pid);
+ }
+ }
+
+ /*
+ * Returns the pid of the task
+ */
+ String getPid(TaskRunner t) {
+ if (t != null && t.getTask() != null) {
+ if (t.getTask().isMapTask()) {
+ JVMId id = mapJvmManager.runningTaskToJvm.get(t);
+ if (id != null) {
+ return mapJvmManager.jvmIdToPid.get(id);
+ }
+ } else {
+ JVMId id = reduceJvmManager.runningTaskToJvm.get(t);
+ if (id != null) {
+ return reduceJvmManager.jvmIdToPid.get(id);
+ }
+ }
+ }
+ return null;
+ }
+
+
public void stop() {
mapJvmManager.stop();
reduceJvmManager.stop();
@@ -119,6 +153,10 @@ class JvmManager {
//Mapping from the JVM IDs to Reduce JVM processes
Map <JVMId, JvmRunner> jvmIdToRunner =
new HashMap<JVMId, JvmRunner>();
+ //Mapping from the JVM IDs to process IDs
+ Map <JVMId, String> jvmIdToPid =
+ new HashMap<JVMId, String>();
+
int maxJvms;
boolean isMap;
@@ -208,6 +246,7 @@ class JvmManager {
synchronized private void removeJvm(JVMId jvmId) {
jvmIdToRunner.remove(jvmId);
+ jvmIdToPid.remove(jvmId);
}
private synchronized void reapJvm(
TaskRunner t, JvmEnv env) {
@@ -314,13 +353,13 @@ class JvmManager {
jvmRunner.start();
}
synchronized private void updateOnJvmExit(JVMId jvmId,
- int exitCode, boolean killed) {
+ int exitCode) {
removeJvm(jvmId);
TaskRunner t = jvmToRunningTask.remove(jvmId);
if (t != null) {
runningTaskToJvm.remove(t);
- if (!killed && exitCode != 0) {
+ if (exitCode != 0) {
t.setExitCode(exitCode);
}
t.signalDone();
@@ -364,8 +403,11 @@ class JvmManager {
if (shexec == null) {
return;
}
+
+ kill();
+
int exitCode = shexec.getExitCode();
- updateOnJvmExit(jvmId, exitCode, killed);
+ updateOnJvmExit(jvmId, exitCode);
LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " +
numTasksRan);
try {
@@ -381,16 +423,23 @@ class JvmManager {
}
public void kill() {
- TaskController controller = tracker.getTaskController();
- //Check inital context before issuing a kill to prevent situations
- //where kill is issued before task is launched.
- if(initalContext != null && initalContext.env != null) {
- controller.killTaskJVM(initalContext);
- } else {
- LOG.info(String.format("JVM Not killed %s but just removed",
- jvmId.toString()));
+ if (!killed) {
+ killed = true;
+ TaskController controller = tracker.getTaskController();
+ // Check inital context before issuing a kill to prevent situations
+ // where kill is issued before task is launched.
+ if (initalContext != null && initalContext.env != null) {
+ initalContext.pid = jvmIdToPid.get(jvmId);
+ initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
+ .getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill",
+ ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+ controller.killTaskJVM(initalContext);
+ } else {
+ LOG.info(String.format("JVM Not killed %s but just removed", jvmId
+ .toString()));
+ }
+ removeJvm(jvmId);
}
- removeJvm(jvmId);
}
public void taskRan() {
@@ -415,21 +464,19 @@ class JvmManager {
File stdout;
File stderr;
File workDir;
- String pidFile;
long logSize;
JobConf conf;
Map<String, String> env;
public JvmEnv(List<String> setup, Vector<String> vargs, File stdout,
File stderr, long logSize, File workDir, Map<String,String> env,
- String pidFile, JobConf conf) {
+ JobConf conf) {
this.setup = setup;
this.vargs = vargs;
this.stdout = stdout;
this.stderr = stderr;
this.workDir = workDir;
this.env = env;
- this.pidFile = pidFile;
this.conf = conf;
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar 4 03:27:24 2011
@@ -126,7 +126,7 @@ class LinuxTaskController extends TaskCo
// get the JVM command line.
String cmdLine =
TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
- env.logSize, env.pidFile);
+ env.logSize, true);
StringBuffer sb = new StringBuffer();
//export out all the environment variable before child command.
for(Entry<String, String> entry : env.env.entrySet()) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar 4 03:27:24 2011
@@ -270,7 +270,7 @@ class LocalJobRunner implements JobSubmi
// TaskUmbilicalProtocol methods
- public JvmTask getTask(JVMId jvmId) { return null; }
+ public JvmTask getTask(JvmContext context) { return null; }
public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar 4 03:27:24 2011
@@ -44,7 +44,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
@@ -59,7 +58,7 @@ import org.apache.hadoop.util.StringUtil
*/
abstract public class Task implements Writable, Configurable {
private static final Log LOG =
- LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner");
+ LogFactory.getLog(Task.class);
// Counters used by Task subclasses
protected static enum Counter {
@@ -187,12 +186,7 @@ abstract public class Task implements Wr
}
Counters getCounters() { return counters; }
- public void setPidFile(String pidFile) {
- this.pidFile = pidFile;
- }
- public String getPidFile() {
- return pidFile;
- }
+
/**
* Get the job name for this task.
@@ -351,7 +345,6 @@ abstract public class Task implements Wr
Text.writeString(out, username);
out.writeBoolean(writeSkipRecs);
out.writeBoolean(taskCleanup);
- Text.writeString(out, pidFile);
}
public void readFields(DataInput in) throws IOException {
@@ -373,7 +366,6 @@ abstract public class Task implements Wr
if (taskCleanup) {
setPhase(TaskStatus.Phase.CLEANUP);
}
- pidFile = Text.readString(in);
}
@Override
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Mar 4 03:27:24 2011
@@ -22,9 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
/**
@@ -99,6 +97,10 @@ abstract class TaskController implements
JvmEnv env;
// the Shell executor executing the JVM for this task
ShellCommandExecutor shExec;
+ // process handle of task JVM
+ String pid;
+ // waiting time before sending SIGKILL to task JVM after sending SIGTERM
+ long sleeptimeBeforeSigkill;
}
/**
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Mar 4 03:27:24 2011
@@ -39,6 +39,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.util.Shell;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -50,7 +52,7 @@ import org.apache.log4j.Logger;
*/
public class TaskLog {
private static final Log LOG =
- LogFactory.getLog(TaskLog.class.getName());
+ LogFactory.getLog(TaskLog.class);
private static final File LOG_DIR =
new File(System.getProperty("hadoop.log.dir"),
@@ -403,7 +405,7 @@ public class TaskLog {
long tailLength
) throws IOException {
return captureOutAndError(null, cmd, stdoutFilename,
- stderrFilename, tailLength, null );
+ stderrFilename, tailLength, false);
}
/**
@@ -425,7 +427,7 @@ public class TaskLog {
long tailLength
) throws IOException {
return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
- tailLength, null);
+ tailLength, false);
}
/**
@@ -438,9 +440,11 @@ public class TaskLog {
* @param stdoutFilename The filename that stdout should be saved to
* @param stderrFilename The filename that stderr should be saved to
* @param tailLength The length of the tail to be saved.
- * @param pidFileName The name of the pid-file
+ * @deprecated pidFiles are no more used. Instead pid is exported to
+ * env variable JVM_PID.
* @return the modified command that should be run
*/
+ @Deprecated
public static List<String> captureOutAndError(List<String> setup,
List<String> cmd,
File stdoutFilename,
@@ -448,15 +452,68 @@ public class TaskLog {
long tailLength,
String pidFileName
) throws IOException {
+ return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
+ tailLength, false, pidFileName);
+ }
+
+ /**
+ * Wrap a command in a shell to capture stdout and stderr to files.
+ * Setup commands such as setting memory limit can be passed which
+ * will be executed before exec.
+ * If the tailLength is 0, the entire output will be saved.
+ * @param setup The setup commands for the execed process.
+ * @param cmd The command and the arguments that should be run
+ * @param stdoutFilename The filename that stdout should be saved to
+ * @param stderrFilename The filename that stderr should be saved to
+ * @param tailLength The length of the tail to be saved.
+ * @param useSetsid Should setsid be used in the command or not.
+ * @deprecated pidFiles are no more used. Instead pid is exported to
+ * env variable JVM_PID.
+ * @return the modified command that should be run
+ *
+ */
+ @Deprecated
+ public static List<String> captureOutAndError(List<String> setup,
+ List<String> cmd,
+ File stdoutFilename,
+ File stderrFilename,
+ long tailLength,
+ boolean useSetsid,
+ String pidFileName
+ ) throws IOException {
+ return captureOutAndError(setup,cmd, stdoutFilename, stderrFilename, tailLength,
+ useSetsid);
+ }
+
+ /**
+ * Wrap a command in a shell to capture stdout and stderr to files.
+ * Setup commands such as setting memory limit can be passed which
+ * will be executed before exec.
+ * If the tailLength is 0, the entire output will be saved.
+ * @param setup The setup commands for the execed process.
+ * @param cmd The command and the arguments that should be run
+ * @param stdoutFilename The filename that stdout should be saved to
+ * @param stderrFilename The filename that stderr should be saved to
+ * @param tailLength The length of the tail to be saved.
+ * @param useSetsid Should setsid be used in the command or not.
+ * @return the modified command that should be run
+ */
+ public static List<String> captureOutAndError(List<String> setup,
+ List<String> cmd,
+ File stdoutFilename,
+ File stderrFilename,
+ long tailLength,
+ boolean useSetsid
+ ) throws IOException {
List<String> result = new ArrayList<String>(3);
result.add(bashCommand);
result.add("-c");
String mergedCmd = buildCommandLine(setup, cmd,
stdoutFilename,
stderrFilename, tailLength,
- pidFileName);
+ useSetsid);
result.add(mergedCmd.toString());
- return result;
+ return result;
}
@@ -464,17 +521,15 @@ public class TaskLog {
List<String> cmd,
File stdoutFilename,
File stderrFilename,
- long tailLength,
- String pidFileName) throws IOException {
+ long tailLength,
+ boolean useSetSid) throws IOException {
String stdout = FileUtil.makeShellPath(stdoutFilename);
String stderr = FileUtil.makeShellPath(stderrFilename);
StringBuffer mergedCmd = new StringBuffer();
- // Spit out the pid to pidFileName
- if (pidFileName != null) {
- mergedCmd.append("echo $$ > ");
- mergedCmd.append(pidFileName);
- mergedCmd.append(" ;");
+
+ if (!Shell.WINDOWS) {
+ mergedCmd.append(" export JVM_PID=`echo $$` ; ");
}
if (setup != null && setup.size() > 0) {
@@ -483,6 +538,9 @@ public class TaskLog {
}
if (tailLength > 0) {
mergedCmd.append("(");
+ } else if (ProcessTree.isSetsidAvailable && useSetSid
+ && !Shell.WINDOWS) {
+ mergedCmd.append("exec setsid ");
} else {
mergedCmd.append("exec ");
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Fri Mar 4 03:27:24 2011
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.ProcfsBasedProcessTree;
import org.apache.hadoop.util.StringUtils;
@@ -42,7 +43,6 @@ class TaskMemoryManagerThread extends Th
private TaskTracker taskTracker;
private long monitoringInterval;
- private long sleepTimeBeforeSigKill;
private long maxMemoryAllowedForAllTasks;
@@ -55,10 +55,7 @@ class TaskMemoryManagerThread extends Th
this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
taskTracker.getJobConf().getLong(
"mapred.tasktracker.taskmemorymanager.monitoring-interval",
- 5000L),
- taskTracker.getJobConf().getLong(
- "mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill",
- ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL));
+ 5000L));
this.taskTracker = taskTracker;
}
@@ -66,8 +63,7 @@ class TaskMemoryManagerThread extends Th
// mainly for test purposes. note that the tasktracker variable is
// not set here.
TaskMemoryManagerThread(long maxMemoryAllowedForAllTasks,
- long monitoringInterval,
- long sleepTimeBeforeSigKill) {
+ long monitoringInterval) {
setName(this.getClass().getName());
processTreeInfoMap = new HashMap<TaskAttemptID, ProcessTreeInfo>();
@@ -77,15 +73,12 @@ class TaskMemoryManagerThread extends Th
this.maxMemoryAllowedForAllTasks = maxMemoryAllowedForAllTasks;
this.monitoringInterval = monitoringInterval;
-
- this.sleepTimeBeforeSigKill = sleepTimeBeforeSigKill;
}
- public void addTask(TaskAttemptID tid, long memLimit, String pidFile) {
+ public void addTask(TaskAttemptID tid, long memLimit) {
synchronized (tasksToBeAdded) {
LOG.debug("Tracking ProcessTree " + tid + " for the first time");
- ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit,
- sleepTimeBeforeSigKill, pidFile);
+ ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit);
tasksToBeAdded.put(tid, ptInfo);
}
}
@@ -104,16 +97,11 @@ class TaskMemoryManagerThread extends Th
private String pidFile;
public ProcessTreeInfo(TaskAttemptID tid, String pid,
- ProcfsBasedProcessTree pTree, long memLimit,
- long sleepTimeBeforeSigKill, String pidFile) {
+ ProcfsBasedProcessTree pTree, long memLimit) {
this.tid = tid;
this.pid = pid;
this.pTree = pTree;
- if (this.pTree != null) {
- this.pTree.setSigKillInterval(sleepTimeBeforeSigKill);
- }
this.memLimit = memLimit;
- this.pidFile = pidFile;
}
public TaskAttemptID getTID() {
@@ -184,8 +172,8 @@ class TaskMemoryManagerThread extends Th
// Initialize any uninitialized processTrees
if (pId == null) {
- // get pid from pid-file
- pId = getPid(ptInfo.pidFile);
+ // get pid from taskAttemptId
+ pId = taskTracker.getPid(ptInfo.getTID());
if (pId != null) {
// PID will be null, either if the pid file is yet to be created
// or if the tip is finished and we removed pidFile, but the TIP
@@ -193,7 +181,12 @@ class TaskMemoryManagerThread extends Th
// transmission to JT
// create process tree object
- ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(pId);
+ long sleeptimeBeforeSigkill = taskTracker.getJobConf().getLong(
+ "mapred.tasktracker.tasks.sleeptime-before-sigkill",
+ ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+
+ ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(
+ pId,ProcessTree.isSetsidAvailable, sleeptimeBeforeSigkill);
LOG.debug("Tracking ProcessTree " + pId + " for the first time");
ptInfo.setPid(pId);
@@ -233,7 +226,7 @@ class TaskMemoryManagerThread extends Th
taskTracker.cleanUpOverMemoryTask(tid, true, msg);
// Now destroy the ProcessTree, remove it from monitoring map.
- pTree.destroy();
+ pTree.destroy(true/*in the background*/);
it.remove();
LOG.info("Removed ProcessTree with root " + pId);
} else {
@@ -368,7 +361,7 @@ class TaskMemoryManagerThread extends Th
// Now destroy the ProcessTree, remove it from monitoring map.
ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
- pTree.destroy();
+ pTree.destroy(true/*in the background*/);
processTreeInfoMap.remove(tid);
LOG.info("Removed ProcessTree with root " + ptInfo.getPID());
}
@@ -377,17 +370,4 @@ class TaskMemoryManagerThread extends Th
+ "But found no alive task to kill for freeing memory.");
}
}
-
- /**
- * Load pid of the task from the pidFile.
- *
- * @param pidFileName
- * @return the pid of the task process.
- */
- private String getPid(String pidFileName) {
- if ((new File(pidFileName)).exists()) {
- return ProcfsBasedProcessTree.getPidFromPidFile(pidFileName);
- }
- return null;
- }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar 4 03:27:24 2011
@@ -368,13 +368,8 @@ abstract class TaskRunner extends Thread
vargs.add(address.getAddress().getHostAddress());
vargs.add(Integer.toString(address.getPort()));
vargs.add(taskid.toString()); // pass task identifier
-
- String pidFile = lDirAlloc.getLocalPathForWrite(
- (TaskTracker.getPidFile(t.getJobID().toString(),
- taskid.toString(), t.isTaskCleanupTask())),
- this.conf).toString();
- t.setPidFile(pidFile);
- tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf, pidFile);
+
+ tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
// set memory limit using ulimit if feasible and necessary ...
String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
@@ -443,7 +438,7 @@ abstract class TaskRunner extends Thread
jvmManager.launchJvm(this,
jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
- workDir, env, pidFile, conf));
+ workDir, env, conf));
synchronized (lock) {
while (!done) {
lock.wait();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 03:27:24 2011
@@ -199,7 +199,6 @@ public class TaskTracker
private static final String SUBDIR = "taskTracker";
private static final String CACHEDIR = "archive";
private static final String JOBCACHE = "jobcache";
- private static final String PID = "pid";
private static final String OUTPUT = "output";
private JobConf originalConf;
private JobConf fConf;
@@ -419,13 +418,14 @@ public class TaskTracker
return taskDir;
}
- static String getPidFile(String jobid,
- String taskid,
- boolean isCleanup) {
- return getLocalTaskDir(jobid, taskid, isCleanup)
- + Path.SEPARATOR + PID;
+ String getPid(TaskAttemptID tid) {
+ TaskInProgress tip = tasks.get(tid);
+ if (tip != null) {
+ return jvmManager.getPid(tip.getTaskRunner());
+ }
+ return null;
}
-
+
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException {
if (protocol.equals(TaskUmbilicalProtocol.class.getName())) {
@@ -760,7 +760,7 @@ public class TaskTracker
}
}
- private LocalDirAllocator lDirAlloc =
+ private static LocalDirAllocator lDirAlloc =
new LocalDirAllocator("mapred.local.dir");
// intialize the job directory
@@ -1742,13 +1742,12 @@ public class TaskTracker
}
void addToMemoryManager(TaskAttemptID attemptId, boolean isMap,
- JobConf conf,
- String pidFile) {
+ JobConf conf) {
if (isTaskMemoryManagerEnabled()) {
taskMemoryManager.addTask(attemptId,
isMap ? conf
.getMemoryForMapTask() * 1024 * 1024L : conf
- .getMemoryForReduceTask() * 1024 * 1024L, pidFile);
+ .getMemoryForReduceTask() * 1024 * 1024L);
}
}
@@ -2534,9 +2533,12 @@ public class TaskTracker
/**
* Called upon startup by the child process, to fetch Task data.
*/
- public synchronized JvmTask getTask(JVMId jvmId)
+ public synchronized JvmTask getTask(JvmContext context)
throws IOException {
+ JVMId jvmId = context.jvmId;
LOG.debug("JVM with ID : " + jvmId + " asked for a task");
+ // save pid of task JVM sent by child
+ jvmManager.setPidToJvm(jvmId, context.pid);
if (!jvmManager.isJvmKnown(jvmId)) {
LOG.info("Killing unknown JVM " + jvmId);
return new JvmTask(null, true);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Mar 4 03:27:24 2011
@@ -55,17 +55,19 @@ interface TaskUmbilicalProtocol extends
* Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759
* Version 16 Added fatalError for child to communicate fatal errors to TT
* Version 16 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
+ * Version 17 Change in signature of getTask() for HADOOP-5488
* */
- public static final long versionID = 16L;
+ public static final long versionID = 17L;
/**
* Called when a child task process starts, to get its task.
- * @param jvmId the ID of this JVM w.r.t the tasktracker that launched it
+ * @param context the JvmContext of the JVM w.r.t the TaskTracker that
+ * launched it
* @return Task object
* @throws IOException
*/
- JvmTask getTask(JVMId jvmId) throws IOException;
+ JvmTask getTask(JvmContext context) throws IOException;
/**
* Report child's progress to parent.
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java Fri Mar 4 03:27:24 2011
@@ -96,7 +96,8 @@ class Application<K1 extends WritableCom
File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
long logLength = TaskLog.getTaskLogLength(conf);
- cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
+ cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
+ false);
process = runClient(cmd, env);
clientSocket = serverSocket.accept();
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=1076975&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java Fri Mar 4 03:27:24 2011
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.util.Shell;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A JUnit test to test Kill Job that has tasks with children and checks if the
+ * children(subprocesses of java task) are also killed when a task is killed.
+ */
+public class TestKillSubProcesses extends TestCase {
+
+ private static volatile Log LOG = LogFactory
+ .getLog(TestKillSubProcesses.class);
+
+ private static String TEST_ROOT_DIR = new File(System.getProperty(
+ "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
+
+ private static JobClient jobClient = null;
+
+ private static MiniMRCluster mr = null;
+ private static Path scriptDir = null;
+ private static String scriptDirName = null;
+ private static String pid = null;
+
+ // number of levels in the subtree of subprocesses of map task
+ private static int numLevelsOfSubProcesses = 4;
+
+ /**
+ * Runs a job, kills the job and verifies if the map task and its
+ * subprocesses are also killed properly or not.
+ */
+ private static void runKillingJobAndValidate(JobTracker jt, JobConf conf) throws IOException {
+
+ conf.setJobName("testkilljobsubprocesses");
+ conf.setMapperClass(KillingMapperWithChildren.class);
+
+ scriptDir = new Path(TEST_ROOT_DIR + "/script");
+ RunningJob job = runJobAndSetProcessHandle(jt, conf);
+
+ // kill the job now
+ job.killJob();
+
+ while (job.cleanupProgress() == 0.0f) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {
+ LOG.warn("sleep is interrupted:" + ie);
+ break;
+ }
+ }
+
+ validateKillingSubprocesses(job, conf);
+ // Checking the Job status
+ assertEquals(job.getJobState(), JobStatus.KILLED);
+ }
+
+ /**
+ * Runs a job that will fail and verifies if the subprocesses of failed map
+ * task are killed properly or not.
+ */
+ private static void runFailingJobAndValidate(JobTracker jt, JobConf conf) throws IOException {
+
+ conf.setJobName("testfailjobsubprocesses");
+ conf.setMapperClass(FailingMapperWithChildren.class);
+
+ // We don't want to run the failing map task 4 times. So we run it once and
+ // check if all the subprocesses are killed properly.
+ conf.setMaxMapAttempts(1);
+
+ scriptDir = new Path(TEST_ROOT_DIR + "/script");
+ RunningJob job = runJobAndSetProcessHandle(jt, conf);
+ signalTask(TEST_ROOT_DIR + "/failjob/signalFile", conf);
+ validateKillingSubprocesses(job, conf);
+ // Checking the Job status
+ assertEquals(job.getJobState(), JobStatus.FAILED);
+ }
+
+ /**
+ * Runs a job that will succeed and verifies if the subprocesses of succeeded
+ * map task are killed properly or not.
+ */
+ private static void runSuccessfulJobAndValidate(JobTracker jt, JobConf conf)
+ throws IOException {
+
+ conf.setJobName("testsucceedjobsubprocesses");
+ conf.setMapperClass(MapperWithChildren.class);
+
+ scriptDir = new Path(TEST_ROOT_DIR + "/script");
+ RunningJob job = runJobAndSetProcessHandle(jt, conf);
+ signalTask(TEST_ROOT_DIR + "/succeedjob/signalFile", conf);
+ validateKillingSubprocesses(job, conf);
+ // Checking the Job status
+ assertEquals(job.getJobState(), JobStatus.SUCCEEDED);
+ }
+
+ /**
+ * Runs the given job and saves the pid of map task.
+ * Also checks if the subprocesses of map task are alive.
+ */
+ private static RunningJob runJobAndSetProcessHandle(JobTracker jt, JobConf conf)
+ throws IOException {
+ RunningJob job = runJob(conf);
+ while (job.getJobState() != JobStatus.RUNNING) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ pid = null;
+ scriptDirName = scriptDir.toUri().getPath();
+ jobClient = new JobClient(conf);
+
+ // get the taskAttemptID of the map task and use it to get the pid
+ // of map task
+ TaskReport[] mapReports = jobClient.getMapTaskReports(job.getID());
+
+ JobInProgress jip = jt.getJob(job.getID());
+ for(TaskReport tr : mapReports) {
+ TaskInProgress tip = jip.getTaskInProgress(tr.getTaskID());
+
+ // for this tip, get active tasks of all attempts
+ while(tip.getActiveTasks().size() == 0) {
+ //wait till the activeTasks Tree is built
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ie) {
+ LOG.warn("sleep is interrupted:" + ie);
+ break;
+ }
+ }
+
+ for (Iterator<TaskAttemptID> it =
+ tip.getActiveTasks().keySet().iterator(); it.hasNext();) {
+ TaskAttemptID id = it.next();
+ LOG.info("taskAttemptID of map task is " + id);
+
+ while(pid == null) {
+ pid = mr.getTaskTrackerRunner(0).getTaskTracker().getPid(id);
+ if (pid == null) {
+ try {
+ Thread.sleep(500);
+ } catch(InterruptedException e) {}
+ }
+ }
+ LOG.info("pid of map task is " + pid);
+
+ // Checking if the map task is alive
+ assertTrue(ProcessTree.isAlive(pid));
+ LOG.info("The map task is alive before Job completion, as expected.");
+ }
+ }
+
+ // Checking if the descendant processes of map task are alive
+ if(ProcessTree.isSetsidAvailable) {
+ String childPid = UtilsForTests.getPidFromPidFile(
+ scriptDirName + "/childPidFile" + 0);
+ while(childPid == null) {
+ LOG.warn(scriptDirName + "/childPidFile" + 0 + " is null; Sleeping...");
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ie) {
+ LOG.warn("sleep is interrupted:" + ie);
+ break;
+ }
+ childPid = UtilsForTests.getPidFromPidFile(
+ scriptDirName + "/childPidFile" + 0);
+ }
+
+ // As childPidFile0(leaf process in the subtree of processes with
+ // map task as root) is created, all other child pid files should
+ // have been created already(See the script for details).
+ // Now check if the descendants of map task are alive.
+ for(int i=0; i <= numLevelsOfSubProcesses; i++) {
+ childPid = UtilsForTests.getPidFromPidFile(
+ scriptDirName + "/childPidFile" + i);
+ LOG.info("pid of the descendant process at level " + i +
+ "in the subtree of processes(with the map task as the root)" +
+ " is " + childPid);
+ assertTrue("Unexpected: The subprocess at level " + i +
+ " in the subtree is not alive before Job completion",
+ ProcessTree.isAlive(childPid));
+ }
+ }
+ return job;
+ }
+
+ /**
+ * Verifies if the subprocesses of the map task are killed properly.
+ */
+ private static void validateKillingSubprocesses(RunningJob job, JobConf conf)
+ throws IOException {
+ // wait till the the job finishes
+ while (!job.isComplete()) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ // Checking if the map task got killed or not
+ assertTrue(!ProcessTree.isAlive(pid));
+ LOG.info("The map task is not alive after Job is completed, as expected.");
+
+ // Checking if the descendant processes of map task are killed properly
+ if(ProcessTree.isSetsidAvailable) {
+ for(int i=0; i <= numLevelsOfSubProcesses; i++) {
+ String childPid = UtilsForTests.getPidFromPidFile(
+ scriptDirName + "/childPidFile" + i);
+ LOG.info("pid of the descendant process at level " + i +
+ "in the subtree of processes(with the map task as the root)" +
+ " is " + childPid);
+ assertTrue("Unexpected: The subprocess at level " + i +
+ " in the subtree is alive after Job completion",
+ !ProcessTree.isAlive(childPid));
+ }
+ }
+ FileSystem fs = FileSystem.get(conf);
+ if(fs.exists(scriptDir)) {
+ fs.delete(scriptDir, true);
+ }
+ }
+
+ private static RunningJob runJob(JobConf conf) throws IOException {
+
+ final Path inDir = new Path(TEST_ROOT_DIR + "/killjob/input");
+ final Path outDir = new Path(TEST_ROOT_DIR + "/killjob/output");
+
+ FileSystem fs = FileSystem.get(conf);
+ if(fs.exists(scriptDir)) {
+ fs.delete(scriptDir, true);
+ }
+
+ conf.setNumMapTasks(1);
+ conf.setNumReduceTasks(0);
+ conf.set("test.build.data", TEST_ROOT_DIR);
+
+ return UtilsForTests.runJob(conf, inDir, outDir);
+ }
+
+ public void testJobKillFailAndSucceed() throws IOException {
+ if (Shell.WINDOWS) {
+ System.out.println(
+ "setsid doesn't work on WINDOWS as expected. Not testing");
+ return;
+ }
+
+ JobConf conf=null;
+ try {
+ mr = new MiniMRCluster(1, "file:///", 1);
+
+ // run the TCs
+ conf = mr.createJobConf();
+ JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+ runKillingJobAndValidate(jt, conf);
+ runFailingJobAndValidate(jt, conf);
+ runSuccessfulJobAndValidate(jt, conf);
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Creates signal file
+ */
+ private static void signalTask(String signalFile, JobConf conf) {
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ fs.createNewFile(new Path(signalFile));
+ } catch(IOException e) {
+ LOG.warn("Unable to create signal file. " + e);
+ }
+ }
+
+ /**
+ * Runs a recursive shell script to create a chain of subprocesses
+ */
+ private static void runChildren(JobConf conf) throws IOException {
+ if (ProcessTree.isSetsidAvailable) {
+ FileSystem fs = FileSystem.get(conf);
+ TEST_ROOT_DIR = new Path(conf.get("test.build.data")).toUri().getPath();
+ scriptDir = new Path(TEST_ROOT_DIR + "/script");
+
+ // create shell script
+ Random rm = new Random();
+ Path scriptPath = new Path(scriptDir, "_shellScript_" + rm.nextInt()
+ + ".sh");
+ String shellScript = scriptPath.toString();
+ String script =
+ "echo $$ > " + scriptDir.toString() + "/childPidFile" + "$1\n" +
+ "echo hello\n" +
+ "if [ $1 != 0 ]\nthen\n" +
+ " sh " + shellScript + " $(($1-1))\n" +
+ "else\n" +
+ " while true\n do\n" +
+ " sleep 2\n" +
+ " done\n" +
+ "fi";
+ DataOutputStream file = fs.create(scriptPath);
+ file.writeBytes(script);
+ file.close();
+
+ LOG.info("Calling script from map task of failjob : " + shellScript);
+ Runtime.getRuntime()
+ .exec(shellScript + " " + numLevelsOfSubProcesses);
+
+ String childPid = UtilsForTests.getPidFromPidFile(scriptDir
+ + "/childPidFile" + 0);
+ while (childPid == null) {
+ LOG.warn(scriptDir + "/childPidFile" + 0 + " is null; Sleeping...");
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ie) {
+ LOG.warn("sleep is interrupted:" + ie);
+ break;
+ }
+ childPid = UtilsForTests.getPidFromPidFile(scriptDir
+ + "/childPidFile" + 0);
+ }
+ }
+ }
+
+ /**
+ * Mapper that starts children
+ */
+ static class MapperWithChildren extends MapReduceBase implements
+ Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+ FileSystem fs = null;
+ final Path signal = new Path(TEST_ROOT_DIR + "/script/signalFile");
+ public void configure(JobConf conf) {
+ try {
+ runChildren(conf);
+ } catch (Exception e) {
+ LOG.warn("Exception in configure: " +
+ StringUtils.stringifyException(e));
+ }
+ }
+
+ // Mapper waits for the signal(signal is the existence of a file)
+ public void map(WritableComparable key, Writable value,
+ OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+ throws IOException {
+ if (fs != null) {
+ while (!fs.exists(signal)) {// wait for signal file creation
+ try {
+ reporter.progress();
+ synchronized (this) {
+ this.wait(1000);
+ }
+ } catch (InterruptedException ie) {
+ System.out.println("Interrupted while the map was waiting for "
+ + " the signal.");
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Mapper that waits till it gets killed.
+ */
+ static class KillingMapperWithChildren extends MapperWithChildren {
+ public void configure(JobConf conf) {
+ super.configure(conf);
+ }
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+ throws IOException {
+
+ try {
+ while(true) {//just wait till kill happens
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Exception in KillMapperWithChild.map:" + e);
+ }
+ }
+ }
+
+ /**
+ * Mapper that fails when recieves a signal. Signal is existence of a file.
+ */
+ static class FailingMapperWithChildren extends MapperWithChildren {
+ public void configure(JobConf conf) {
+ super.configure(conf);
+ }
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+ throws IOException {
+ if (fs != null) {
+ while (!fs.exists(signal)) {// wait for signal file creation
+ try {
+ reporter.progress();
+ synchronized (this) {
+ this.wait(1000);
+ }
+ } catch (InterruptedException ie) {
+ System.out.println("Interrupted while the map was waiting for "
+ + " the signal.");
+ break;
+ }
+ }
+ }
+ throw new RuntimeException("failing map");
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Fri Mar 4 03:27:24 2011
@@ -440,7 +440,6 @@ public class TestTaskTrackerMemoryManage
// Create TaskMemoryMonitorThread
TaskMemoryManagerThread test = new TaskMemoryManagerThread(1000000L,
- 5000L,
5000L);
// create process trees
// tree rooted at 100 is over limit immediately, as it is
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1076975&r1=1076974&r2=1076975&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Mar 4 03:27:24 2011
@@ -548,7 +548,9 @@ public class UtilsForTests {
int numReds) throws IOException {
FileSystem fs = FileSystem.get(conf);
- fs.delete(outDir, true);
+ if (fs.exists(outDir)) {
+ fs.delete(outDir, true);
+ }
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
@@ -683,5 +685,52 @@ public class UtilsForTests {
config.writeXml(fos);
fos.close();
}
+
+
+ /**
+ * Get PID from a pid-file.
+ *
+ * @param pidFileName
+ * Name of the pid-file.
+ * @return the PID string read from the pid-file. Returns null if the
+ * pidFileName points to a non-existing file or if read fails from the
+ * file.
+ */
+ public static String getPidFromPidFile(String pidFileName) {
+ BufferedReader pidFile = null;
+ FileReader fReader = null;
+ String pid = null;
+
+ try {
+ fReader = new FileReader(pidFileName);
+ pidFile = new BufferedReader(fReader);
+ } catch (FileNotFoundException f) {
+ LOG.debug("PidFile doesn't exist : " + pidFileName);
+ return pid;
+ }
+
+ try {
+ pid = pidFile.readLine();
+ } catch (IOException i) {
+ LOG.error("Failed to read from " + pidFileName);
+ } finally {
+ try {
+ if (fReader != null) {
+ fReader.close();
+ }
+ try {
+ if (pidFile != null) {
+ pidFile.close();
+ }
+ } catch (IOException i) {
+ LOG.warn("Error closing the stream " + pidFile);
+ }
+ } catch (IOException i) {
+ LOG.warn("Error closing the stream " + fReader);
+ }
+ }
+ return pid;
+ }
+
}