You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 06:56:31 UTC
svn commit: r1079211 [8/11] - in /hadoop/mapreduce/branches/yahoo-merge: ./
src/c++/task-controller/ src/c++/task-controller/impl/
src/c++/task-controller/test/ src/c++/task-controller/tests/
src/contrib/fairscheduler/designdoc/ src/contrib/streaming/s...
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JvmManager.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JvmManager.java Tue Mar 8 05:56:27 2011
@@ -30,13 +30,17 @@ import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskController.DelayedProcessKiller;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.mapreduce.util.ProcessTree;
+import static org.apache.hadoop.mapred.TaskController.Signal;
class JvmManager {
@@ -49,8 +53,8 @@ class JvmManager {
public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
File stdout,File stderr,long logSize, File workDir,
- Map<String,String> env, JobConf conf) {
- return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,conf);
+ JobConf conf) {
+ return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,conf);
}
public JvmManager(TaskTracker tracker) {
@@ -69,7 +73,7 @@ class JvmManager {
return null;
}
- public void stop() {
+ public void stop() throws IOException, InterruptedException {
mapJvmManager.stop();
reduceJvmManager.stop();
}
@@ -114,7 +118,8 @@ class JvmManager {
return null;
}
- public void launchJvm(TaskRunner t, JvmEnv env) {
+ public void launchJvm(TaskRunner t, JvmEnv env)
+ throws IOException, InterruptedException {
if (t.getTask().isMapTask()) {
mapJvmManager.reapJvm(t, env);
} else {
@@ -138,7 +143,8 @@ class JvmManager {
}
}
- public void taskKilled(TaskRunner tr) {
+ public void taskKilled(TaskRunner tr
+ ) throws IOException, InterruptedException {
if (tr.getTask().isMapTask()) {
mapJvmManager.taskKilled(tr);
} else {
@@ -146,15 +152,7 @@ class JvmManager {
}
}
- void dumpStack(TaskRunner tr) {
- if (tr.getTask().isMapTask()) {
- mapJvmManager.dumpStack(tr);
- } else {
- reduceJvmManager.dumpStack(tr);
- }
- }
-
- public void killJvm(JVMId jvmId) {
+ public void killJvm(JVMId jvmId) throws IOException, InterruptedException {
if (jvmId.isMap) {
mapJvmManager.killJvm(jvmId);
} else {
@@ -167,15 +165,19 @@ class JvmManager {
* asynchronous deletion of work dir.
* @param tracker taskTracker
* @param task the task whose work dir needs to be deleted
- * @throws IOException
*/
- static void deleteWorkDir(TaskTracker tracker, Task task) throws IOException {
+ static void deleteWorkDir(TaskTracker tracker, Task task) {
+ String user = task.getUser();
+ String jobid = task.getJobID().toString();
+ String taskid = task.getTaskID().toString();
+ String workDir = TaskTracker.getTaskWorkDir(user, jobid, taskid,
+ task.isTaskCleanupTask());
+ String userDir = TaskTracker.getUserDir(user);
tracker.getCleanupThread().addToQueue(
- TaskTracker.buildTaskControllerTaskPathDeletionContexts(
- tracker.getLocalFileSystem(),
- tracker.getLocalFiles(tracker.getJobConf(), ""),
- task, true /* workDir */,
- tracker.getTaskController()));
+ new TaskController.DeletionContext(tracker.getTaskController(), false,
+ user,
+ workDir, tracker.getLocalDirs()));
+
}
static class JvmManagerForType {
@@ -192,18 +194,25 @@ class JvmManager {
Map <JVMId, String> jvmIdToPid =
new HashMap<JVMId, String>();
- int maxJvms;
- boolean isMap;
-
- TaskTracker tracker;
-
- Random rand = new Random(System.currentTimeMillis());
+ final int maxJvms;
+ final boolean isMap;
+ final TaskTracker tracker;
+ final long sleeptimeBeforeSigkill;
+ final Random rand = new Random();
+
+ static final String DELAY_BEFORE_KILL_KEY =
+ "mapred.tasktracker.tasks.sleeptime-before-sigkill";
+ // number of milliseconds to wait between TERM and KILL.
+ private static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 250;
public JvmManagerForType(int maxJvms, boolean isMap,
TaskTracker tracker) {
this.maxJvms = maxJvms;
this.isMap = isMap;
this.tracker = tracker;
+ sleeptimeBeforeSigkill =
+ tracker.getJobConf().getLong(DELAY_BEFORE_KILL_KEY,
+ DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
}
synchronized public void setRunningTaskForJvm(JVMId jvmId,
@@ -215,36 +224,20 @@ class JvmManager {
synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
throws IOException {
- if (jvmToRunningTask.containsKey(jvmId)) {
- //Incase of JVM reuse, tasks are returned to previously launched
- //JVM via this method. However when a new task is launched
- //the task being returned has to be initialized.
- TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
- JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
- Task task = taskRunner.getTaskInProgress().getTask();
-
- // Initialize task dirs
- TaskControllerContext context =
- new TaskController.TaskControllerContext();
- context.env = jvmRunner.env;
- context.task = task;
- // If we are returning the same task as which the JVM was launched
- // we don't initialize task once again.
- if (!jvmRunner.env.conf.get(JobContext.TASK_ATTEMPT_ID).equals(
- task.getTaskID().toString())) {
- try {
- tracker.getTaskController().initializeTask(context);
- } catch (IOException e) {
- LOG.warn("Failed to initialize the new task "
- + task.getTaskID().toString() + " to be given to JVM with id "
- + jvmId);
- throw e;
- }
- }
-
- return taskRunner.getTaskInProgress();
- }
- return null;
+ final TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
+ return null == taskRunner ? null : taskRunner.getTaskInProgress();
+ //if (jvmToRunningTask.containsKey(jvmId)) {
+ // //Incase of JVM reuse, tasks are returned to previously launched
+ // //JVM via this method. However when a new task is launched
+ // //the task being returned has to be initialized.
+ // TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
+ // // TODO retained for MAPREDUCE-1100
+ // JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
+ // Task task = taskRunner.getTaskInProgress().getTask();
+
+ // return taskRunner.getTaskInProgress();
+ //}
+ //return null;
}
synchronized public boolean isJvmknown(JVMId jvmId) {
@@ -262,7 +255,9 @@ class JvmManager {
}
}
- synchronized public void taskKilled(TaskRunner tr) {
+ synchronized public void taskKilled(TaskRunner tr
+ ) throws IOException,
+ InterruptedException {
JVMId jvmId = runningTaskToJvm.remove(tr);
if (jvmId != null) {
jvmToRunningTask.remove(jvmId);
@@ -270,29 +265,22 @@ class JvmManager {
}
}
- synchronized public void killJvm(JVMId jvmId) {
+ synchronized public void killJvm(JVMId jvmId)
+ throws IOException, InterruptedException {
JvmRunner jvmRunner;
if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
killJvmRunner(jvmRunner);
}
}
- private synchronized void killJvmRunner(JvmRunner jvmRunner) {
+ private synchronized void killJvmRunner(JvmRunner jvmRunner)
+ throws IOException, InterruptedException {
jvmRunner.kill();
removeJvm(jvmRunner.jvmId);
}
- synchronized void dumpStack(TaskRunner tr) {
- JVMId jvmId = runningTaskToJvm.get(tr);
- if (null != jvmId) {
- JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
- if (null != jvmRunner) {
- jvmRunner.dumpChildStacks();
- }
- }
- }
-
- synchronized public void stop() {
+ synchronized public void stop()
+ throws IOException, InterruptedException {
//since the kill() method invoked later on would remove
//an entry from the jvmIdToRunner map, we create a
//copy of the values and iterate over it (if we don't
@@ -310,7 +298,7 @@ class JvmManager {
jvmIdToPid.remove(jvmId);
}
private synchronized void reapJvm(
- TaskRunner t, JvmEnv env) {
+ TaskRunner t, JvmEnv env) throws IOException, InterruptedException {
if (t.getTaskInProgress().wasKilled()) {
//the task was killed in-flight
//no need to do the rest of the operations
@@ -399,7 +387,7 @@ class JvmManager {
private void spawnNewJvm(JobID jobId, JvmEnv env,
TaskRunner t) {
- JvmRunner jvmRunner = new JvmRunner(env,jobId);
+ JvmRunner jvmRunner = new JvmRunner(env, jobId, t.getTask());
jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
//spawn the JVM in a new thread. Note that there will be very little
//extra overhead of launching the new thread for a new JVM since
@@ -435,71 +423,89 @@ class JvmManager {
JVMId jvmId;
volatile boolean busy = true;
private ShellCommandExecutor shexec; // shell terminal for running the task
- //context used for starting JVM
- private TaskControllerContext initalContext;
+ private Task firstTask;
- public JvmRunner(JvmEnv env, JobID jobId) {
+ public JvmRunner(JvmEnv env, JobID jobId, Task firstTask) {
this.env = env;
this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm();
+ this.firstTask = firstTask;
LOG.info("In JvmRunner constructed JVM ID: " + jvmId);
}
+
+ @Override
public void run() {
- runChild(env);
+ try {
+ runChild(env);
+ } catch (InterruptedException ie) {
+ return;
+ } catch (IOException e) {
+ LOG.warn("Caught IOException in JVMRunner", e);
+ } catch (Throwable e) {
+ LOG.error("Caught Throwable in JVMRunner. Aborting TaskTracker.", e);
+ System.exit(1);
+ } finally {
+ // TODO MR-1100
+ //jvmFinished();
+ }
}
- public void runChild(JvmEnv env) {
- initalContext = new TaskControllerContext();
+ public void runChild(JvmEnv env)
+ throws IOException, InterruptedException {
+ int exitCode = 0;
try {
env.vargs.add(Integer.toString(jvmId.getId()));
- //Launch the task controller to run task JVM
- initalContext.task = jvmToRunningTask.get(jvmId).getTask();
- initalContext.env = env;
- tracker.getTaskController().launchTaskJVM(initalContext);
+ TaskRunner runner = jvmToRunningTask.get(jvmId);
+ if (runner != null) {
+ Task task = runner.getTask();
+ //Launch the task controller to run task JVM
+ String user = task.getUser();
+ TaskAttemptID taskAttemptId = task.getTaskID();
+ String taskAttemptIdStr = task.isTaskCleanupTask() ?
+ (taskAttemptId.toString() + TaskTracker.TASK_CLEANUP_SUFFIX) :
+ taskAttemptId.toString();
+ exitCode = tracker.getTaskController().launchTask(user,
+ jvmId.jobId.toString(), taskAttemptIdStr, env.setup,
+ env.vargs, env.workDir, env.stdout.toString(),
+ env.stderr.toString());
+ }
} catch (IOException ioe) {
// do nothing
// error and output are appropriately redirected
} finally { // handle the exit code
- shexec = initalContext.shExec;
- if (shexec == null) {
- return;
- }
-
+ // although the process has exited before we get here,
+ // make sure the entire process group has also been killed.
kill();
- int exitCode = shexec.getExitCode();
updateOnJvmExit(jvmId, exitCode);
LOG.info("JVM : " + jvmId + " exited with exit code " + exitCode
+ ". Number of tasks it ran: " + numTasksRan);
- try {
- // In case of jvm-reuse,
- //the task jvm cleans up the common workdir for every
- //task at the beginning of each task in the task JVM.
- //For the last task, we do it here.
- if (env.conf.getNumTasksToExecutePerJvm() != 1) {
- deleteWorkDir(tracker, initalContext.task);
- }
- } catch (IOException ie){}
+ deleteWorkDir(tracker, firstTask);
}
}
/**
- * Kills the process. Also kills its subprocesses if the process(root of subtree
- * of processes) is created using setsid.
+ * Kills the process. Also kills its subprocesses if the process(root of
+ * subtree of processes) is created using setsid.
*/
- synchronized void kill() {
+ synchronized void kill() throws IOException, InterruptedException {
if (!killed) {
TaskController controller = tracker.getTaskController();
// Check inital context before issuing a kill to prevent situations
// where kill is issued before task is launched.
- if (initalContext != null && initalContext.env != null) {
- initalContext.pid = jvmIdToPid.get(jvmId);
- initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
- .getLong(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
- ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
-
- // Destroy the task jvm
- controller.destroyTaskJVM(initalContext);
+ String pidStr = jvmIdToPid.get(jvmId);
+ if (pidStr != null) {
+ String user = env.conf.getUser();
+ int pid = Integer.parseInt(pidStr);
+ // start a thread that will kill the process dead
+ if (sleeptimeBeforeSigkill > 0) {
+ controller.signalTask(user, pid, Signal.QUIT);
+ controller.signalTask(user, pid, Signal.TERM);
+ new DelayedProcessKiller(user, pid, sleeptimeBeforeSigkill,
+ Signal.KILL, tracker.getTaskController()).start();
+ } else {
+ controller.signalTask(user, pid, Signal.KILL);
+ }
} else {
LOG.info(String.format("JVM Not killed %s but just removed", jvmId
.toString()));
@@ -508,37 +514,6 @@ class JvmManager {
}
}
- /** Send a signal to the JVM requesting that it dump a stack trace,
- * and wait for a timeout interval to give this signal time to be
- * processed.
- */
- void dumpChildStacks() {
- if (!killed) {
- TaskController controller = tracker.getTaskController();
- // Check inital context before issuing a signal to prevent situations
- // where signal is issued before task is launched.
- if (initalContext != null && initalContext.env != null) {
- initalContext.pid = jvmIdToPid.get(jvmId);
- initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
- .getLong(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
- ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
-
- // signal the task jvm
- controller.dumpTaskStack(initalContext);
-
- // We're going to kill the jvm with SIGKILL after this,
- // so we should wait for a few seconds first to ensure that
- // the SIGQUIT has time to be processed.
- try {
- Thread.sleep(initalContext.sleeptimeBeforeSigkill);
- } catch (InterruptedException e) {
- LOG.warn("Sleep interrupted : " +
- StringUtils.stringifyException(e));
- }
- }
- }
- }
-
public void taskRan() {
busy = false;
numTasksRan++;
@@ -565,15 +540,13 @@ class JvmManager {
JobConf conf;
Map<String, String> env;
- public JvmEnv(List<String> setup, Vector<String> vargs, File stdout,
- File stderr, long logSize, File workDir, Map<String,String> env,
- JobConf conf) {
+ public JvmEnv(List <String> setup, Vector<String> vargs, File stdout,
+ File stderr, long logSize, File workDir, JobConf conf) {
this.setup = setup;
this.vargs = vargs;
this.stdout = stdout;
this.stderr = stderr;
this.workDir = workDir;
- this.env = env;
this.conf = conf;
}
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Tue Mar 8 05:56:27 2011
@@ -14,30 +14,27 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
+*/
package org.apache.hadoop.mapred;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
-import java.io.PrintWriter;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
+import static org.apache.hadoop.mapred.TaskController.Signal;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* A {@link TaskController} that runs the task JVMs as the user
@@ -48,8 +45,8 @@ import org.apache.hadoop.util.Shell.Shel
* JVM and killing it when needed, and also initializing and
* finalizing the task environment.
* <p> The setuid executable is launched using the command line:</p>
- * <p>task-controller mapreduce.job.user.name command command-args, where</p>
- * <p>mapreduce.job.user.name is the name of the owner who submits the job</p>
+ * <p>task-controller user-name command command-args, where</p>
+ * <p>user-name is the name of the owner who submits the job</p>
* <p>command is one of the cardinal value of the
* {@link LinuxTaskController.TaskControllerCommands} enumeration</p>
* <p>command-args depends on the command being launched.</p>
@@ -62,52 +59,78 @@ class LinuxTaskController extends TaskCo
private static final Log LOG =
LogFactory.getLog(LinuxTaskController.class);
-
- // Name of the executable script that will contain the child
- // JVM command line. See writeCommand for details.
- private static final String COMMAND_FILE = "taskjvm.sh";
// Path to the setuid executable.
- private static String taskControllerExe;
+ private String taskControllerExe;
+ private static final String TASK_CONTROLLER_EXEC_KEY =
+ "mapreduce.tasktracker.task-controller.exe";
- static {
- // the task-controller is expected to be under the $HADOOP_HOME/bin
- // directory.
- File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin");
- taskControllerExe =
- new File(hadoopBin, "task-controller").getAbsolutePath();
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ taskControllerExe = getTaskControllerExecutablePath(conf);
}
-
+
public LinuxTaskController() {
super();
}
-
+
+ protected String getTaskControllerExecutablePath(Configuration conf) {
+ File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin");
+ String defaultTaskController =
+ new File(hadoopBin, "task-controller").getAbsolutePath();
+ return null == conf
+ ? defaultTaskController
+ : conf.get(TASK_CONTROLLER_EXEC_KEY, defaultTaskController);
+ }
+
/**
* List of commands that the setuid script will execute.
*/
- enum TaskControllerCommands {
- INITIALIZE_USER,
- INITIALIZE_JOB,
- INITIALIZE_DISTRIBUTEDCACHE_FILE,
- LAUNCH_TASK_JVM,
- INITIALIZE_TASK,
- TERMINATE_TASK_JVM,
- KILL_TASK_JVM,
- RUN_DEBUG_SCRIPT,
- SIGQUIT_TASK_JVM,
- ENABLE_TASK_FOR_CLEANUP,
- ENABLE_JOB_FOR_CLEANUP
+ enum Commands {
+ INITIALIZE_JOB(0),
+ LAUNCH_TASK_JVM(1),
+ SIGNAL_TASK(2),
+ DELETE_AS_USER(3),
+ DELETE_LOG_AS_USER(4);
+
+ private int value;
+ Commands(int value) {
+ this.value = value;
+ }
+ int getValue() {
+ return value;
+ }
+ }
+
+ /**
+ * Result codes returned from the C task-controller.
+ * These must match the values in task-controller.h.
+ */
+ enum ResultCode {
+ OK(0),
+ INVALID_USER_NAME(2),
+ INVALID_TASK_PID(9),
+ INVALID_TASKCONTROLLER_PERMISSIONS(22),
+ INVALID_CONFIG_FILE(24);
+
+ private final int value;
+ ResultCode(int value) {
+ this.value = value;
+ }
+ int getValue() {
+ return value;
+ }
}
@Override
- public void setup() throws IOException {
- super.setup();
-
+ public void setup(LocalDirAllocator allocator) throws IOException {
+
// Check the permissions of the task-controller binary by running it plainly.
- // If permissions are correct, it returns an error code 1, else it returns
+ // If permissions are correct, it returns an error code 1, else it returns
// 24 or something else if some other bugs are also present.
String[] taskControllerCmd =
- new String[] { getTaskControllerExecutablePath() };
+ new String[] { taskControllerExe };
ShellCommandExecutor shExec = new ShellCommandExecutor(taskControllerCmd);
try {
shExec.execute();
@@ -120,52 +143,96 @@ class LinuxTaskController extends TaskCo
+ "permissions/ownership with exit code " + exitCode, e);
}
}
+ this.allocator = allocator;
+ }
+
+
+ @Override
+ public void initializeJob(String user, String jobid, Path credentials,
+ Path jobConf, TaskUmbilicalProtocol taskTracker,
+ InetSocketAddress ttAddr
+ ) throws IOException, InterruptedException {
+ List<String> command = new ArrayList<String>(
+ Arrays.asList(taskControllerExe,
+ user,
+ Integer.toString(Commands.INITIALIZE_JOB.getValue()),
+ jobid,
+ credentials.toUri().getPath().toString(),
+ jobConf.toUri().getPath().toString()));
+ File jvm = // use same jvm as parent
+ new File(new File(System.getProperty("java.home"), "bin"), "java");
+ command.add(jvm.toString());
+ command.add("-classpath");
+ command.add(System.getProperty("java.class.path"));
+ command.add("-Dhadoop.log.dir=" + TaskLog.getBaseLogDir());
+ command.add("-Dhadoop.root.logger=INFO,console");
+ command.add(JobLocalizer.class.getName()); // main of JobLocalizer
+ command.add(user);
+ command.add(jobid);
+ // add the task tracker's reporting address
+ command.add(ttAddr.getHostName());
+ command.add(Integer.toString(ttAddr.getPort()));
+ String[] commandArray = command.toArray(new String[0]);
+ ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("initializeJob: " + Arrays.toString(commandArray));
+ }
+ try {
+ shExec.execute();
+ if (LOG.isDebugEnabled()) {
+ logOutput(shExec.getOutput());
+ }
+ } catch (ExitCodeException e) {
+ int exitCode = shExec.getExitCode();
+ logOutput(shExec.getOutput());
+ throw new IOException("Job initialization failed (" + exitCode + ")", e);
+ }
}
- /**
- * Launch a task JVM that will run as the owner of the job.
- *
- * This method launches a task JVM by executing a setuid executable that will
- * switch to the user and run the task. Also does initialization of the first
- * task in the same setuid process launch.
- */
@Override
- void launchTaskJVM(TaskController.TaskControllerContext context)
- throws IOException {
- JvmEnv env = context.env;
- // get the JVM command line.
- String cmdLine =
- TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
- env.logSize, true);
-
- StringBuffer sb = new StringBuffer();
- //export out all the environment variable before child command as
- //the setuid/setgid binaries would not be getting, any environmental
- //variables which begin with LD_*.
- for(Entry<String, String> entry : env.env.entrySet()) {
- sb.append("export ");
- sb.append(entry.getKey());
- sb.append("=");
- sb.append(entry.getValue());
- sb.append("\n");
- }
- sb.append(cmdLine);
- // write the command to a file in the
- // task specific cache directory
- writeCommand(sb.toString(), getTaskCacheDirectory(context,
- context.env.workDir));
-
- // Call the taskcontroller with the right parameters.
- List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context,
- context.env.workDir);
- ShellCommandExecutor shExec = buildTaskControllerExecutor(
- TaskControllerCommands.LAUNCH_TASK_JVM,
- env.conf.getUser(),
- launchTaskJVMArgs, env.workDir, env.env);
- context.shExec = shExec;
+ public int launchTask(String user,
+ String jobId,
+ String attemptId,
+ List<String> setup,
+ List<String> jvmArguments,
+ File currentWorkDirectory,
+ String stdout,
+ String stderr) throws IOException {
+
+ ShellCommandExecutor shExec = null;
try {
+ FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw();
+ long logSize = 0; //TODO, Ref BUG:2854624
+ // get the JVM command line.
+ String cmdLine =
+ TaskLog.buildCommandLine(setup, jvmArguments,
+ new File(stdout), new File(stderr), logSize, true);
+
+ // write the command to a file in the
+ // task specific cache directory
+ Path p = new Path(allocator.getLocalPathForWrite(
+ TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId),
+ getConf()), COMMAND_FILE);
+ String commandFile = writeCommand(cmdLine, rawFs, p);
+
+ String[] command =
+ new String[]{taskControllerExe,
+ user,
+ Integer.toString(Commands.LAUNCH_TASK_JVM.getValue()),
+ jobId,
+ attemptId,
+ currentWorkDirectory.toString(),
+ commandFile};
+ shExec = new ShellCommandExecutor(command);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("launchTask: " + Arrays.toString(command));
+ }
shExec.execute();
} catch (Exception e) {
+ if (shExec == null) {
+ return -1;
+ }
int exitCode = shExec.getExitCode();
LOG.warn("Exit code from task is : " + exitCode);
// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
@@ -177,476 +244,79 @@ class LinuxTaskController extends TaskCo
LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
logOutput(shExec.getOutput());
}
- throw new IOException(e);
+ return exitCode;
}
if (LOG.isDebugEnabled()) {
- LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+ LOG.debug("Output from LinuxTaskController's launchTask follows:");
logOutput(shExec.getOutput());
}
+ return 0;
}
-
- /**
- * Launch the debug script process that will run as the owner of the job.
- *
- * This method launches the task debug script process by executing a setuid
- * executable that will switch to the user and run the task.
- */
+
@Override
- void runDebugScript(DebugScriptContext context) throws IOException {
- String debugOut = FileUtil.makeShellPath(context.stdout);
- String cmdLine = TaskLog.buildDebugScriptCommandLine(context.args, debugOut);
- writeCommand(cmdLine, getTaskCacheDirectory(context, context.workDir));
- // Call the taskcontroller with the right parameters.
- List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, context.workDir);
- runCommand(TaskControllerCommands.RUN_DEBUG_SCRIPT, context.task.getUser(),
- launchTaskJVMArgs, context.workDir, null);
- }
- /**
- * Helper method that runs a LinuxTaskController command
- *
- * @param taskControllerCommand
- * @param user
- * @param cmdArgs
- * @param env
- * @throws IOException
- */
- private void runCommand(TaskControllerCommands taskControllerCommand,
- String user, List<String> cmdArgs, File workDir, Map<String, String> env)
- throws IOException {
-
- ShellCommandExecutor shExec =
- buildTaskControllerExecutor(taskControllerCommand, user, cmdArgs,
- workDir, env);
- try {
- shExec.execute();
- } catch (Exception e) {
- LOG.warn("Exit code from " + taskControllerCommand.toString() + " is : "
- + shExec.getExitCode());
- LOG.warn("Exception thrown by " + taskControllerCommand.toString() + " : "
- + StringUtils.stringifyException(e));
- LOG.info("Output from LinuxTaskController's "
- + taskControllerCommand.toString() + " follows:");
- logOutput(shExec.getOutput());
- throw new IOException(e);
+ public void deleteAsUser(String user, String subDir, String... baseDirs)
+ throws IOException {
+ List<String> command = new ArrayList<String>(
+ Arrays.asList(
+ taskControllerExe,
+ user,
+ Integer.toString(Commands.DELETE_AS_USER.getValue()),
+ subDir));
+ for (String baseDir : baseDirs) {
+ command.add(baseDir);
}
+ String[] commandArray = command.toArray(new String[0]);
+ ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
if (LOG.isDebugEnabled()) {
- LOG.info("Output from LinuxTaskController's "
- + taskControllerCommand.toString() + " follows:");
- logOutput(shExec.getOutput());
- }
- }
-
- /**
- * Returns list of arguments to be passed while initializing a new task. See
- * {@code buildTaskControllerExecutor(TaskControllerCommands, String,
- * List<String>, JvmEnv)} documentation.
- *
- * @param context
- * @return Argument to be used while launching Task VM
- */
- private List<String> buildInitializeTaskArgs(TaskExecContext context) {
- List<String> commandArgs = new ArrayList<String>(3);
- String taskId = context.task.getTaskID().toString();
- String jobId = getJobId(context);
- commandArgs.add(jobId);
- if (!context.task.isTaskCleanupTask()) {
- commandArgs.add(taskId);
- } else {
- commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
+ LOG.debug("deleteAsUser: " + Arrays.toString(commandArray));
}
- return commandArgs;
+ shExec.execute();
}
@Override
- void initializeTask(TaskControllerContext context)
- throws IOException {
+ public void deleteLogAsUser(String user, String subDir) throws IOException {
+ String[] command =
+ new String[]{taskControllerExe,
+ user,
+ Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()),
+ subDir};
+ ShellCommandExecutor shExec = new ShellCommandExecutor(command);
if (LOG.isDebugEnabled()) {
- LOG.debug("Going to do "
- + TaskControllerCommands.INITIALIZE_TASK.toString()
- + " for " + context.task.getTaskID().toString());
- }
- runCommand(TaskControllerCommands.INITIALIZE_TASK,
- context.env.conf.getUser(),
- buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
- }
-
- /**
- * Builds the args to be passed to task-controller for enabling of task for
- * cleanup. Last arg in this List is either $attemptId or $attemptId/work
- */
- private List<String> buildTaskCleanupArgs(
- TaskControllerTaskPathDeletionContext context) {
- List<String> commandArgs = new ArrayList<String>(3);
- commandArgs.add(context.mapredLocalDir.toUri().getPath());
- commandArgs.add(context.task.getJobID().toString());
-
- String workDir = "";
- if (context.isWorkDir) {
- workDir = "/work";
- }
- if (context.task.isTaskCleanupTask()) {
- commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX
- + workDir);
- } else {
- commandArgs.add(context.task.getTaskID() + workDir);
+ LOG.debug("deleteLogAsUser: " + Arrays.toString(command));
}
-
- return commandArgs;
+ shExec.execute();
}
- /**
- * Builds the args to be passed to task-controller for enabling of job for
- * cleanup. Last arg in this List is $jobid.
- */
- private List<String> buildJobCleanupArgs(
- TaskControllerJobPathDeletionContext context) {
- List<String> commandArgs = new ArrayList<String>(2);
- commandArgs.add(context.mapredLocalDir.toUri().getPath());
- commandArgs.add(context.jobId.toString());
-
- return commandArgs;
- }
-
- /**
- * Enables the task for cleanup by changing permissions of the specified path
- * in the local filesystem
- */
- @Override
- void enableTaskForCleanup(PathDeletionContext context)
- throws IOException {
- if (context instanceof TaskControllerTaskPathDeletionContext) {
- TaskControllerTaskPathDeletionContext tContext =
- (TaskControllerTaskPathDeletionContext) context;
- enablePathForCleanup(tContext,
- TaskControllerCommands.ENABLE_TASK_FOR_CLEANUP,
- buildTaskCleanupArgs(tContext));
- }
- else {
- throw new IllegalArgumentException("PathDeletionContext provided is not "
- + "TaskControllerTaskPathDeletionContext.");
- }
- }
-
- /**
- * Enables the job for cleanup by changing permissions of the specified path
- * in the local filesystem
- */
@Override
- void enableJobForCleanup(PathDeletionContext context)
- throws IOException {
- if (context instanceof TaskControllerJobPathDeletionContext) {
- TaskControllerJobPathDeletionContext tContext =
- (TaskControllerJobPathDeletionContext) context;
- enablePathForCleanup(tContext,
- TaskControllerCommands.ENABLE_JOB_FOR_CLEANUP,
- buildJobCleanupArgs(tContext));
- } else {
- throw new IllegalArgumentException("PathDeletionContext provided is not "
- + "TaskControllerJobPathDeletionContext.");
- }
- }
-
- /**
- * Enable a path for cleanup
- * @param c {@link TaskControllerPathDeletionContext} for the path to be
- * cleaned up
- * @param command {@link TaskControllerCommands} for task/job cleanup
- * @param cleanupArgs arguments for the {@link LinuxTaskController} to enable
- * path cleanup
- */
- private void enablePathForCleanup(TaskControllerPathDeletionContext c,
- TaskControllerCommands command,
- List<String> cleanupArgs) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Going to do " + command.toString() + " for " + c.fullPath);
- }
-
- if ( c.user != null && c.fs instanceof LocalFileSystem) {
- try {
- runCommand(command, c.user, cleanupArgs, null, null);
- } catch(IOException e) {
- LOG.warn("Unable to change permissions for " + c.fullPath);
- }
- }
- else {
- throw new IllegalArgumentException("Either user is null or the "
- + "file system is not local file system.");
- }
- }
-
- private void logOutput(String output) {
- String shExecOutput = output;
- if (shExecOutput != null) {
- for (String str : shExecOutput.split("\n")) {
- LOG.info(str);
- }
- }
- }
-
- private String getJobId(TaskExecContext context) {
- String taskId = context.task.getTaskID().toString();
- TaskAttemptID tId = TaskAttemptID.forName(taskId);
- String jobId = tId.getJobID().toString();
- return jobId;
- }
-
- /**
- * Returns list of arguments to be passed while launching task VM.
- * See {@code buildTaskControllerExecutor(TaskControllerCommands,
- * String, List<String>, JvmEnv)} documentation.
- * @param context
- * @return Argument to be used while launching Task VM
- */
- private List<String> buildLaunchTaskArgs(TaskExecContext context,
- File workDir) {
- List<String> commandArgs = new ArrayList<String>(3);
- LOG.debug("getting the task directory as: "
- + getTaskCacheDirectory(context, workDir));
- LOG.debug("getting the tt_root as " +getDirectoryChosenForTask(
- new File(getTaskCacheDirectory(context, workDir)),
- context) );
- commandArgs.add(getDirectoryChosenForTask(
- new File(getTaskCacheDirectory(context, workDir)),
- context));
- commandArgs.addAll(buildInitializeTaskArgs(context));
- return commandArgs;
- }
-
- // Get the directory from the list of directories configured
- // in Configs.LOCAL_DIR chosen for storing data pertaining to
- // this task.
- private String getDirectoryChosenForTask(File directory,
- TaskExecContext context) {
- String jobId = getJobId(context);
- String taskId = context.task.getTaskID().toString();
- for (String dir : mapredLocalDirs) {
- File mapredDir = new File(dir);
- File taskDir =
- new File(mapredDir, TaskTracker.getTaskWorkDir(context.task
- .getUser(), jobId, taskId, context.task.isTaskCleanupTask()))
- .getParentFile();
- if (directory.equals(taskDir)) {
- return dir;
- }
- }
-
- LOG.error("Couldn't parse task cache directory correctly");
- throw new IllegalArgumentException("invalid task cache directory "
- + directory.getAbsolutePath());
- }
-
- /**
- * Builds the command line for launching/terminating/killing task JVM.
- * Following is the format for launching/terminating/killing task JVM
- * <br/>
- * For launching following is command line argument:
- * <br/>
- * {@code mapreduce.job.user.name command tt-root job_id task_id}
- * <br/>
- * For terminating/killing task jvm.
- * {@code mapreduce.job.user.name command tt-root task-pid}
- *
- * @param command command to be executed.
- * @param userName mapreduce.job.user.name
- * @param cmdArgs list of extra arguments
- * @param workDir working directory for the task-controller
- * @param env JVM environment variables.
- * @return {@link ShellCommandExecutor}
- * @throws IOException
- */
- private ShellCommandExecutor buildTaskControllerExecutor(
- TaskControllerCommands command, String userName, List<String> cmdArgs,
- File workDir, Map<String, String> env)
- throws IOException {
- String[] taskControllerCmd = new String[3 + cmdArgs.size()];
- taskControllerCmd[0] = getTaskControllerExecutablePath();
- taskControllerCmd[1] = userName;
- taskControllerCmd[2] = String.valueOf(command.ordinal());
- int i = 3;
- for (String cmdArg : cmdArgs) {
- taskControllerCmd[i++] = cmdArg;
- }
+ public boolean signalTask(String user, int taskPid,
+ Signal signal) throws IOException {
+ String[] command =
+ new String[]{taskControllerExe,
+ user,
+ Integer.toString(Commands.SIGNAL_TASK.getValue()),
+ Integer.toString(taskPid),
+ Integer.toString(signal.getValue())};
+ ShellCommandExecutor shExec = new ShellCommandExecutor(command);
if (LOG.isDebugEnabled()) {
- for (String cmd : taskControllerCmd) {
- LOG.debug("taskctrl command = " + cmd);
- }
- }
- ShellCommandExecutor shExec = null;
- if(workDir != null && workDir.exists()) {
- shExec = new ShellCommandExecutor(taskControllerCmd,
- workDir, env);
- } else {
- shExec = new ShellCommandExecutor(taskControllerCmd);
- }
-
- return shExec;
- }
-
- // Return the task specific directory under the cache.
- private String getTaskCacheDirectory(TaskExecContext context,
- File workDir) {
- // In the case of JVM reuse, the task specific directory
- // is different from what is set with respect with
- // env.workDir. Hence building this from the taskId everytime.
- String taskId = context.task.getTaskID().toString();
- File cacheDirForJob = workDir.getParentFile().getParentFile();
- if(context.task.isTaskCleanupTask()) {
- taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
- }
- return new File(cacheDirForJob, taskId).getAbsolutePath();
- }
-
- // Write the JVM command line to a file under the specified directory
- // Note that the JVM will be launched using a setuid executable, and
- // could potentially contain strings defined by a user. Hence, to
- // prevent special character attacks, we write the command line to
- // a file and execute it.
- private void writeCommand(String cmdLine,
- String directory) throws IOException {
-
- PrintWriter pw = null;
- String commandFile = directory + File.separator + COMMAND_FILE;
- LOG.info("Writing commands to " + commandFile);
- LOG.info("--------Commands Begin--------");
- LOG.info(cmdLine);
- LOG.info("--------Commands End--------");
- try {
- FileWriter fw = new FileWriter(commandFile);
- BufferedWriter bw = new BufferedWriter(fw);
- pw = new PrintWriter(bw);
- pw.write(cmdLine);
- } catch (IOException ioe) {
- LOG.error("Caught IOException while writing JVM command line to file. "
- + ioe.getMessage());
- } finally {
- if (pw != null) {
- pw.close();
- }
- // set execute permissions for all on the file.
- File f = new File(commandFile);
- if (f.exists()) {
- f.setReadable(true, false);
- f.setExecutable(true, false);
- }
+ LOG.debug("signalTask: " + Arrays.toString(command));
}
- }
-
- private List<String> buildInitializeJobCommandArgs(
- JobInitializationContext context) {
- List<String> initJobCmdArgs = new ArrayList<String>();
- initJobCmdArgs.add(context.jobid.toString());
- return initJobCmdArgs;
- }
-
- @Override
- void initializeJob(JobInitializationContext context)
- throws IOException {
- LOG.debug("Going to initialize job " + context.jobid.toString()
- + " on the TT");
- runCommand(TaskControllerCommands.INITIALIZE_JOB, context.user,
- buildInitializeJobCommandArgs(context), context.workDir, null);
- }
-
- @Override
- public void initializeDistributedCacheFile(DistributedCacheFileContext context)
- throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Going to initialize distributed cache for " + context.user
- + " with localizedBaseDir " + context.localizedBaseDir +
- " and uniqueString " + context.uniqueString);
- }
- List<String> args = new ArrayList<String>();
- // Here, uniqueString might start with '-'. Adding -- in front of the
- // arguments indicates that they are non-option parameters.
- args.add("--");
- args.add(context.localizedBaseDir.toString());
- args.add(context.uniqueString);
- runCommand(TaskControllerCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE,
- context.user, args, context.workDir, null);
- }
-
- @Override
- public void initializeUser(InitializationContext context)
- throws IOException {
- LOG.debug("Going to initialize user directories for " + context.user
- + " on the TT");
- runCommand(TaskControllerCommands.INITIALIZE_USER, context.user,
- new ArrayList<String>(), context.workDir, null);
- }
-
- /**
- * API which builds the command line to be pass to LinuxTaskController
- * binary to terminate/kill the task. See
- * {@code buildTaskControllerExecutor(TaskControllerCommands,
- * String, List<String>, JvmEnv)} documentation.
- *
- *
- * @param context context of task which has to be passed kill signal.
- *
- */
- private List<String> buildKillTaskCommandArgs(TaskControllerContext
- context){
- List<String> killTaskJVMArgs = new ArrayList<String>();
- killTaskJVMArgs.add(context.pid);
- return killTaskJVMArgs;
- }
-
- /**
- * Convenience method used to sending appropriate signal to the task
- * VM
- * @param context
- * @param command
- * @throws IOException
- */
- protected void signalTask(TaskControllerContext context,
- TaskControllerCommands command) throws IOException{
- if(context.task == null) {
- LOG.info("Context task is null; not signaling the JVM");
- return;
- }
- ShellCommandExecutor shExec = buildTaskControllerExecutor(
- command, context.env.conf.getUser(),
- buildKillTaskCommandArgs(context), context.env.workDir,
- context.env.env);
try {
shExec.execute();
- } catch (Exception e) {
- LOG.warn("Output from task-contoller is : " + shExec.getOutput());
- throw new IOException(e);
- }
- }
-
- @Override
- void terminateTask(TaskControllerContext context) {
- try {
- signalTask(context, TaskControllerCommands.TERMINATE_TASK_JVM);
- } catch (Exception e) {
- LOG.warn("Exception thrown while sending kill to the Task VM " +
- StringUtils.stringifyException(e));
- }
- }
-
- @Override
- void killTask(TaskControllerContext context) {
- try {
- signalTask(context, TaskControllerCommands.KILL_TASK_JVM);
- } catch (Exception e) {
- LOG.warn("Exception thrown while sending destroy to the Task VM " +
- StringUtils.stringifyException(e));
+ } catch (ExitCodeException e) {
+ int ret_code = shExec.getExitCode();
+ if (ret_code == ResultCode.INVALID_TASK_PID.getValue()) {
+ return false;
+ }
+ logOutput(shExec.getOutput());
+ throw new IOException("Problem signalling task " + taskPid + " with " +
+ signal + "; exit = " + ret_code);
}
+ return true;
}
@Override
- void dumpTaskStack(TaskControllerContext context) {
- try {
- signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM);
- } catch (Exception e) {
- LOG.warn("Exception thrown while sending SIGQUIT to the Task VM " +
- StringUtils.stringifyException(e));
- }
- }
-
- protected String getTaskControllerExecutablePath() {
- return taskControllerExe;
+ public String getRunAsUser(JobConf conf) {
+ return conf.getUser();
}
}
+
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Mar 8 05:56:27 2011
@@ -38,7 +38,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.ClusterMetrics;
@@ -79,6 +78,7 @@ public class LocalJobRunner implements C
private AtomicInteger map_tasks = new AtomicInteger(0);
private int reduce_tasks = 0;
final Random rand = new Random();
+ private final TaskController taskController = new DefaultTaskController();
private JobTrackerInstrumentation myMetrics = null;
@@ -118,7 +118,7 @@ public class LocalJobRunner implements C
private FileSystem localFs;
boolean killed = false;
- private TrackerDistributedCacheManager trackerDistributerdCacheManager;
+ private TrackerDistributedCacheManager trackerDistributedCacheManager;
private TaskDistributedCacheManager taskDistributedCacheManager;
public long getProtocolVersion(String protocol, long clientVersion) {
@@ -136,14 +136,12 @@ public class LocalJobRunner implements C
// Manage the distributed cache. If there are files to be copied,
// this will trigger localFile to be re-written again.
- this.trackerDistributerdCacheManager =
- new TrackerDistributedCacheManager(conf, new DefaultTaskController());
+ this.trackerDistributedCacheManager =
+ new TrackerDistributedCacheManager(conf);
this.taskDistributedCacheManager =
- trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
- taskDistributedCacheManager.setup(
- new LocalDirAllocator(MRConfig.LOCAL_DIR),
- new File(systemJobDir.toString()),
- "archive", "archive");
+ trackerDistributedCacheManager.newTaskDistributedCacheManager(
+ jobid, conf);
+ taskDistributedCacheManager.setupCache(conf, "archive", "archive");
if (DistributedCache.getSymlink(conf)) {
// This is not supported largely because,
@@ -460,7 +458,7 @@ public class LocalJobRunner implements C
localFs.delete(localJobFile, true); // delete local copy
// Cleanup distributed cache
taskDistributedCacheManager.release();
- trackerDistributerdCacheManager.purgeCache();
+ trackerDistributedCacheManager.purgeCache();
} catch (IOException e) {
LOG.warn("Error cleaning up "+id+": "+e);
}
@@ -534,6 +532,14 @@ public class LocalJobRunner implements C
public boolean ping(TaskAttemptID taskid) throws IOException {
return true;
}
+
+ @Override
+ public void updatePrivateDistributedCacheSizes(
+ org.apache.hadoop.mapreduce.JobID jobId,
+ long[] sizes)
+ throws IOException {
+ trackerDistributedCacheManager.setArchiveSizes(jobId, sizes);
+ }
public boolean canCommit(TaskAttemptID taskid)
throws IOException {
@@ -580,6 +586,7 @@ public class LocalJobRunner implements C
this.fs = FileSystem.getLocal(conf);
this.conf = conf;
myMetrics = new JobTrackerMetricsInst(null, new JobConf(conf));
+ taskController.setConf(conf);
}
// JobSubmissionProtocol methods
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java Tue Mar 8 05:56:27 2011
@@ -155,8 +155,10 @@ class MapTask extends Task {
@Override
public TaskRunner createRunner(TaskTracker tracker,
- TaskTracker.TaskInProgress tip) {
- return new MapTaskRunner(tip, tracker, this.conf);
+ TaskTracker.TaskInProgress tip,
+ TaskTracker.RunningJob rjob
+ ) throws IOException {
+ return new MapTaskRunner(tip, tracker, this.conf, rjob);
}
@Override
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTaskRunner.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTaskRunner.java Tue Mar 8 05:56:27 2011
@@ -17,14 +17,17 @@
*/
package org.apache.hadoop.mapred;
+import java.io.IOException;
+
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.log4j.Level;
/** Runs a map task. */
class MapTaskRunner extends TaskRunner {
- public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf) {
- super(task, tracker, conf);
+ public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf,
+ TaskTracker.RunningJob rjob) throws IOException {
+ super(task, tracker, conf, rjob);
}
@Override
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Mar 8 05:56:27 2011
@@ -146,9 +146,10 @@ public class ReduceTask extends Task {
}
@Override
- public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip)
- throws IOException {
- return new ReduceTaskRunner(tip, tracker, this.conf);
+ public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip,
+ TaskTracker.RunningJob rjob
+ ) throws IOException {
+ return new ReduceTaskRunner(tip, tracker, this.conf, rjob);
}
@Override
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Tue Mar 8 05:56:27 2011
@@ -26,9 +26,10 @@ import org.apache.log4j.Level;
class ReduceTaskRunner extends TaskRunner {
public ReduceTaskRunner(TaskInProgress task, TaskTracker tracker,
- JobConf conf) throws IOException {
+ JobConf conf, TaskTracker.RunningJob rjob
+ ) throws IOException {
- super(task, tracker, conf);
+ super(task, tracker, conf, rjob);
}
public void close() throws IOException {
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Tue Mar 8 05:56:27 2011
@@ -458,7 +458,9 @@ abstract public class Task implements Wr
/** Return an approprate thread runner for this task.
* @param tip TODO*/
public abstract TaskRunner createRunner(TaskTracker tracker,
- TaskTracker.TaskInProgress tip) throws IOException;
+ TaskTracker.TaskInProgress tip,
+ TaskTracker.RunningJob rjob
+ ) throws IOException;
/** The number of milliseconds between progress reports. */
public static final int PROGRESS_INTERVAL = 3000;
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskController.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskController.java Tue Mar 8 05:56:27 2011
@@ -14,11 +14,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
+*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -26,15 +28,12 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.fs.permission.FsPermission;
+
import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
/**
* Controls initialization, finalization and clean up of tasks, and
@@ -46,392 +45,250 @@ import org.apache.hadoop.classification.
* performing the actual actions.
*
* <br/>
+ *
+ * NOTE: This class is internal only class and not intended for users!!
*/
-@InterfaceAudience.Private
public abstract class TaskController implements Configurable {
+ /**
+ * The constants for the signals.
+ */
+ public enum Signal {
+ NULL(0, "NULL"), QUIT(3, "SIGQUIT"),
+ KILL(9, "SIGKILL"), TERM(15, "SIGTERM");
+ private final int value;
+ private final String str;
+ private Signal(int value, String str) {
+ this.str = str;
+ this.value = value;
+ }
+ public int getValue() {
+ return value;
+ }
+ @Override
+ public String toString() {
+ return str;
+ }
+ }
+
private Configuration conf;
-
+
public static final Log LOG = LogFactory.getLog(TaskController.class);
+ //Name of the executable script that will contain the child
+ // JVM command line. See writeCommand for details.
+ protected static final String COMMAND_FILE = "taskjvm.sh";
+
+ protected LocalDirAllocator allocator;
+
+ final public static FsPermission TASK_LAUNCH_SCRIPT_PERMISSION =
+ FsPermission.createImmutable((short) 0700); // rwx--------
+
public Configuration getConf() {
return conf;
}
- // The list of directory paths specified in the variable Configs.LOCAL_DIR
- // This is used to determine which among the list of directories is picked up
- // for storing data for a particular task.
- protected String[] mapredLocalDirs;
-
public void setConf(Configuration conf) {
this.conf = conf;
- mapredLocalDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR);
}
/**
- * Sets up the permissions of the following directories on all the configured
- * disks:
- * <ul>
- * <li>mapreduce.cluster.local.directories</li>
- * <li>Hadoop log directories</li>
- * </ul>
+ * Does initialization and setup.
+ * @param allocator the local dir allocator to use
*/
- public void setup() throws IOException {
- for (String localDir : this.mapredLocalDirs) {
- // Set up the mapreduce.cluster.local.directories.
- File mapredlocalDir = new File(localDir);
- if (!mapredlocalDir.exists() && !mapredlocalDir.mkdirs()) {
- LOG.warn("Unable to create mapreduce.cluster.local.directory : "
- + mapredlocalDir.getPath());
- } else {
- Localizer.PermissionsHandler.setPermissions(mapredlocalDir,
- Localizer.PermissionsHandler.sevenFiveFive);
- }
- }
-
- // Set up the user log directory
- File taskLog = TaskLog.getUserLogDir();
- if (!taskLog.exists() && !taskLog.mkdirs()) {
- LOG.warn("Unable to create taskLog directory : " + taskLog.getPath());
- } else {
- Localizer.PermissionsHandler.setPermissions(taskLog,
- Localizer.PermissionsHandler.sevenFiveFive);
- }
- }
-
+ public abstract void setup(LocalDirAllocator allocator) throws IOException;
+
/**
- * Take task-controller specific actions to initialize job. This involves
- * setting appropriate permissions to job-files so as to secure the files to
- * be accessible only by the user's tasks.
- *
+ * Create all of the directories necessary for the job to start and download
+ * all of the job and private distributed cache files.
+ * Creates both the user directories and the job log directory.
+ * @param user the user name
+ * @param jobid the job
+ * @param credentials a filename containing the job secrets
+ * @param jobConf the path to the localized configuration file
+ * @param taskTracker the connection the task tracker
+ * @param ttAddr the tasktracker's RPC address
* @throws IOException
*/
- abstract void initializeJob(JobInitializationContext context) throws IOException;
-
+ public abstract void initializeJob(String user, String jobid,
+ Path credentials, Path jobConf,
+ TaskUmbilicalProtocol taskTracker,
+ InetSocketAddress ttAddr)
+ throws IOException, InterruptedException;
+
/**
- * Take task-controller specific actions to initialize the distributed cache
- * file. This involves setting appropriate permissions for these files so as
- * to secure them to be accessible only their owners.
- *
- * @param context
+ * Create all of the directories for the task and launches the child jvm.
+ * @param user the user name
+ * @param jobId the jobId in question
+ * @param attemptId the attempt id (cleanup attempts have .cleanup suffix)
+ * @param setup list of shell commands to execute before the jvm
+ * @param jvmArguments list of jvm arguments
+ * @param currentWorkDirectory the full path of the cwd for the task
+ * @param stdout the file to redirect stdout to
+ * @param stderr the file to redirect stderr to
+ * @return the exit code for the task
* @throws IOException
*/
- public abstract void initializeDistributedCacheFile(DistributedCacheFileContext context)
- throws IOException;
-
- /**
- * Launch a task JVM
- *
- * This method defines how a JVM will be launched to run a task. Each
- * task-controller should also do an
- * {@link #initializeTask(TaskControllerContext)} inside this method so as to
- * initialize the task before launching it. This is for reasons of
- * task-controller specific optimizations w.r.t combining initialization and
- * launching of tasks.
- *
- * @param context the context associated to the task
- */
- abstract void launchTaskJVM(TaskControllerContext context)
- throws IOException;
-
+ public abstract
+ int launchTask(String user,
+ String jobId,
+ String attemptId,
+ List<String> setup,
+ List<String> jvmArguments,
+ File currentWorkDirectory,
+ String stdout,
+ String stderr) throws IOException;
+
/**
- * Top level cleanup a task JVM method.
- * <ol>
- * <li>Sends a graceful termiante signal to task JVM to allow subprocesses
- * to cleanup.</li>
- * <li>Sends a forceful kill signal to task JVM, terminating all its
- * sub-processes forcefully.</li>
- * </ol>
- *
- * @param context the task for which kill signal has to be sent.
+ * Send a signal to a task pid as the user. Always signal the process group.
+ * An implementation may elect to signal the pid directly if the former is
+ * unavailable or fails.
+ * @param user the user name
+ * @param taskPid the pid of the task
+ * @param signal the id of the signal to send
+ * @return false if the process does not exist
+ * @throws IOException If the task controller failed to signal the process
+ * (group), but the process exists.
*/
- final void destroyTaskJVM(TaskControllerContext context) {
- // Send SIGTERM to try to ask for a polite exit.
- terminateTask(context);
-
- try {
- Thread.sleep(context.sleeptimeBeforeSigkill);
- } catch (InterruptedException e) {
- LOG.warn("Sleep interrupted : " +
- StringUtils.stringifyException(e));
- }
-
- killTask(context);
- }
-
- /** Perform initializing actions required before a task can run.
- *
- * For instance, this method can be used to setup appropriate
- * access permissions for files and directories that will be
- * used by tasks. Tasks use the job cache, log, and distributed cache
- * directories and files as part of their functioning. Typically,
- * these files are shared between the daemon and the tasks
- * themselves. So, a TaskController that is launching tasks
- * as different users can implement this method to setup
- * appropriate ownership and permissions for these directories
- * and files.
- */
- abstract void initializeTask(TaskControllerContext context)
- throws IOException;
-
- static class TaskExecContext {
- // task being executed
- Task task;
- }
+ public abstract boolean signalTask(String user, int taskPid,
+ Signal signal) throws IOException;
+
/**
- * Contains task information required for the task controller.
+ * Delete the user's files under all of the task tracker root directories.
+ * @param user the user name
+ * @param subDir the path relative to base directories
+ * @param baseDirs the base directories (absolute paths)
+ * @throws IOException
*/
- static class TaskControllerContext extends TaskExecContext {
- ShellCommandExecutor shExec; // the Shell executor executing the JVM for this task.
-
- // Information used only when this context is used for launching new tasks.
- JvmEnv env; // the JVM environment for the task.
-
- // Information used only when this context is used for destroying a task jvm.
- String pid; // process handle of task JVM.
- long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after sending SIGTERM
- }
-
+ public abstract void deleteAsUser(String user,
+ String subDir,
+ String... baseDirs) throws IOException;
+
/**
- * Contains info related to the path of the file/dir to be deleted. This info
- * is needed by task-controller to build the full path of the file/dir
+ * Delete the user's files under the userlogs directory.
+ * @param user the user to work as
+ * @param subDir the path under the userlogs directory.
+ * @throws IOException
*/
- static abstract class TaskControllerPathDeletionContext
- extends PathDeletionContext {
- TaskController taskController;
- String user;
-
- /**
- * mapredLocalDir is the base dir under which to-be-deleted jobLocalDir,
- * taskWorkDir or taskAttemptDir exists. fullPath of jobLocalDir,
- * taskAttemptDir or taskWorkDir is built using mapredLocalDir, jobId,
- * taskId, etc.
- */
- Path mapredLocalDir;
-
- public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir,
- TaskController taskController,
- String user) {
- super(fs, null);
- this.taskController = taskController;
- this.mapredLocalDir = mapredLocalDir;
+ public abstract void deleteLogAsUser(String user,
+ String subDir) throws IOException;
+
+ static class DeletionContext extends CleanupQueue.PathDeletionContext {
+ private TaskController controller;
+ private boolean isLog;
+ private String user;
+ private String subDir;
+ private String[] baseDirs;
+ DeletionContext(TaskController controller, boolean isLog, String user,
+ String subDir, String[] baseDirs) {
+ super(null, null);
+ this.controller = controller;
+ this.isLog = isLog;
this.user = user;
+ this.subDir = subDir;
+ this.baseDirs = baseDirs;
}
-
+
@Override
- protected String getPathForCleanup() {
- if (fullPath == null) {
- fullPath = buildPathForDeletion();
+ protected void deletePath() throws IOException {
+ if (isLog) {
+ controller.deleteLogAsUser(user, subDir);
+ } else {
+ controller.deleteAsUser(user, subDir, baseDirs);
}
- return fullPath;
}
- /**
- * Return the component of the path under the {@link #mapredLocalDir} to be
- * cleaned up. Its the responsibility of the class that extends
- * {@link TaskControllerPathDeletionContext} to provide the correct
- * component. For example
- * - For task related cleanups, either the task-work-dir or task-local-dir
- * might be returned depending on jvm reuse.
- * - For job related cleanup, simply the job-local-dir might be returned.
- */
- abstract protected String getPath();
-
- /**
- * Builds the path of taskAttemptDir OR taskWorkDir based on
- * mapredLocalDir, jobId, taskId, etc
- */
- String buildPathForDeletion() {
- return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + getPath();
+ @Override
+ public String toString() {
+ return (isLog ? "log(" : "dir(") +
+ user + "," + subDir + ")";
}
}
-
- /** Contains info related to the path of the file/dir to be deleted. This info
- * is needed by task-controller to build the full path of the task-work-dir or
- * task-local-dir depending on whether the jvm is reused or not.
- */
- static class TaskControllerTaskPathDeletionContext
- extends TaskControllerPathDeletionContext {
- final Task task;
- final boolean isWorkDir;
-
- public TaskControllerTaskPathDeletionContext(FileSystem fs,
- Path mapredLocalDir, Task task, boolean isWorkDir,
- TaskController taskController) {
- super(fs, mapredLocalDir, taskController, task.getUser());
- this.task = task;
- this.isWorkDir = isWorkDir;
- }
-
- /**
- * Returns the taskWorkDir or taskLocalDir based on whether
- * {@link TaskControllerTaskPathDeletionContext} is configured to delete
- * the workDir.
- */
- @Override
- protected String getPath() {
- String subDir = (isWorkDir) ? TaskTracker.getTaskWorkDir(task.getUser(),
- task.getJobID().toString(), task.getTaskID().toString(),
- task.isTaskCleanupTask())
- : TaskTracker.getLocalTaskDir(task.getUser(),
- task.getJobID().toString(), task.getTaskID().toString(),
- task.isTaskCleanupTask());
- return subDir;
+
+ /**
+ * Returns the local unix user that a given job will run as.
+ */
+ public String getRunAsUser(JobConf conf) {
+ return System.getProperty("user.name");
+ }
+
+ //Write the JVM command line to a file under the specified directory
+ // Note that the JVM will be launched using a setuid executable, and
+ // could potentially contain strings defined by a user. Hence, to
+ // prevent special character attacks, we write the command line to
+ // a file and execute it.
+ protected static String writeCommand(String cmdLine, FileSystem fs,
+ Path commandFile) throws IOException {
+ PrintWriter pw = null;
+ LOG.info("Writing commands to " + commandFile);
+ try {
+ pw = new PrintWriter(FileSystem.create(
+ fs, commandFile, TASK_LAUNCH_SCRIPT_PERMISSION));
+ pw.write(cmdLine);
+ } catch (IOException ioe) {
+ LOG.error("Caught IOException while writing JVM command line to file. ",
+ ioe);
+ } finally {
+ if (pw != null) {
+ pw.close();
+ }
}
-
- /**
- * Makes the path(and its subdirectories recursively) fully deletable by
- * setting proper permissions(770) by task-controller
- */
- @Override
- protected void enablePathForCleanup() throws IOException {
- getPathForCleanup();// allow init of fullPath, if not inited already
- if (fs.exists(new Path(fullPath))) {
- taskController.enableTaskForCleanup(this);
+ return commandFile.makeQualified(fs).toUri().getPath();
+ }
+
+ protected void logOutput(String output) {
+ String shExecOutput = output;
+ if (shExecOutput != null) {
+ for (String str : shExecOutput.split("\n")) {
+ LOG.info(str);
}
}
}
- /** Contains info related to the path of the file/dir to be deleted. This info
- * is needed by task-controller to build the full path of the job-local-dir.
- */
- static class TaskControllerJobPathDeletionContext
- extends TaskControllerPathDeletionContext {
- final JobID jobId;
-
- public TaskControllerJobPathDeletionContext(FileSystem fs,
- Path mapredLocalDir, JobID id, String user,
+ public static final boolean isSetsidAvailable = isSetsidSupported();
+ private static boolean isSetsidSupported() {
+ ShellCommandExecutor shexec = null;
+ boolean setsidSupported = true;
+ try {
+ String[] args = {"setsid", "bash", "-c", "echo $$"};
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (IOException ioe) {
+ LOG.warn("setsid is not available on this machine. So not using it.");
+ setsidSupported = false;
+ } finally { // handle the exit code
+ LOG.info("setsid exited with exit code " + shexec.getExitCode());
+ }
+ return setsidSupported;
+ }
+
+ public static class DelayedProcessKiller extends Thread {
+ private final String user;
+ private final int pid;
+ private final long delay;
+ private final Signal signal;
+ private final TaskController taskController;
+ public DelayedProcessKiller(String user, int pid, long delay, Signal signal,
TaskController taskController) {
- super(fs, mapredLocalDir, taskController, user);
- this.jobId = id;
- }
-
- /**
- * Returns the jobLocalDir of the job to be cleaned up.
- */
- @Override
- protected String getPath() {
- return TaskTracker.getLocalJobDir(user, jobId.toString());
+ this.user = user;
+ this.pid = pid;
+ this.delay = delay;
+ this.signal = signal;
+ this.taskController = taskController;
+ setName("Task killer for " + pid);
+ setDaemon(false);
}
-
- /**
- * Makes the path(and its sub-directories recursively) fully deletable by
- * setting proper permissions(770) by task-controller
- */
@Override
- protected void enablePathForCleanup() throws IOException {
- getPathForCleanup();// allow init of fullPath, if not inited already
- if (fs.exists(new Path(fullPath))) {
- taskController.enableJobForCleanup(this);
+ public void run() {
+ try {
+ Thread.sleep(delay);
+ taskController.signalTask(user, pid, signal);
+ } catch (InterruptedException e) {
+ return;
+ } catch (IOException e) {
+ LOG.warn("Exception when killing task " + pid, e);
}
}
}
-
- @InterfaceAudience.Private
- @InterfaceStability.Unstable
- public static class InitializationContext {
- public File workDir;
- public String user;
-
- public InitializationContext() {
- }
-
- public InitializationContext(String user, File workDir) {
- this.user = user;
- this.workDir = workDir;
- }
- }
-
- /**
- * This is used for initializing the private localized files in distributed
- * cache. Initialization would involve changing permission, ownership and etc.
- */
- @InterfaceAudience.Private
- @InterfaceStability.Unstable
- public static class DistributedCacheFileContext extends InitializationContext {
- // base directory under which file has been localized
- Path localizedBaseDir;
- // the unique string used to construct the localized path
- String uniqueString;
-
- public DistributedCacheFileContext(String user, File workDir,
- Path localizedBaseDir, String uniqueString) {
- super(user, workDir);
- this.localizedBaseDir = localizedBaseDir;
- this.uniqueString = uniqueString;
- }
-
- public Path getLocalizedUniqueDir() {
- return new Path(localizedBaseDir, new Path(TaskTracker
- .getPrivateDistributedCacheDir(user), uniqueString));
- }
- }
-
- static class JobInitializationContext extends InitializationContext {
- JobID jobid;
- }
-
- static class DebugScriptContext extends TaskExecContext {
- List<String> args;
- File workDir;
- File stdout;
- }
-
- /**
- * Sends a graceful terminate signal to taskJVM and it sub-processes.
- *
- * @param context task context
- */
- abstract void terminateTask(TaskControllerContext context);
-
- /**
- * Sends a KILL signal to forcefully terminate the taskJVM and its
- * sub-processes.
- *
- * @param context task context
- */
- abstract void killTask(TaskControllerContext context);
-
-
- /**
- * Sends a QUIT signal to direct the task JVM (and sub-processes) to
- * dump their stack to stdout.
- *
- * @param context task context.
- */
- abstract void dumpTaskStack(TaskControllerContext context);
- /**
- * Initialize user on this TaskTracer in a TaskController specific manner.
- *
- * @param context
- * @throws IOException
- */
- public abstract void initializeUser(InitializationContext context)
- throws IOException;
-
- /**
- * Launch the task debug script
- *
- * @param context
- * @throws IOException
- */
- abstract void runDebugScript(DebugScriptContext context)
- throws IOException;
-
- /**
- * Enable the task for cleanup by changing permissions of the path
- * @param context path deletion context
- * @throws IOException
- */
- abstract void enableTaskForCleanup(PathDeletionContext context)
- throws IOException;
-
- /**
- * Enable the job for cleanup by changing permissions of the path
- * @param context path deletion context
- * @throws IOException
- */
- abstract void enableJobForCleanup(PathDeletionContext context)
- throws IOException;
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskLog.java Tue Mar 8 05:56:27 2011
@@ -40,8 +40,8 @@ import org.apache.hadoop.fs.LocalFileSys
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -152,7 +152,14 @@ public class TaskLog {
static File getAttemptDir(TaskAttemptID taskid, boolean isCleanup) {
String cleanupSuffix = isCleanup ? ".cleanup" : "";
- return new File(getJobDir(taskid.getJobID()), taskid + cleanupSuffix);
+ return getAttemptDir(taskid.getJobID().toString(),
+ taskid.toString() + cleanupSuffix);
+ }
+
+ static File getAttemptDir(String jobid, String taskid) {
+ // taskid should be fully formed and it should have the optional
+ // .cleanup suffix
+ return new File(getJobDir(jobid), taskid);
}
private static long prevOutLength;
private static long prevErrLength;
@@ -468,21 +475,23 @@ public class TaskLog {
String stdout = FileUtil.makeShellPath(stdoutFilename);
String stderr = FileUtil.makeShellPath(stderrFilename);
- StringBuffer mergedCmd = new StringBuffer();
+ StringBuilder mergedCmd = new StringBuilder();
// Export the pid of taskJvm to env variable JVM_PID.
// Currently pid is not used on Windows
if (!Shell.WINDOWS) {
- mergedCmd.append(" export JVM_PID=`echo $$` ; ");
+ mergedCmd.append("export JVM_PID=`echo $$` ; ");
}
- if (setup != null && setup.size() > 0) {
- mergedCmd.append(addCommand(setup, false));
- mergedCmd.append(";");
+ if (setup != null) {
+ for (String s : setup) {
+ mergedCmd.append(s);
+ mergedCmd.append("\n");
+ }
}
if (tailLength > 0) {
mergedCmd.append("(");
- } else if(ProcessTree.isSetsidAvailable && useSetsid &&
+ } else if(TaskController.isSetsidAvailable && useSetsid &&
!Shell.WINDOWS) {
mergedCmd.append("exec setsid ");
} else {
@@ -555,7 +564,7 @@ public class TaskLog {
*/
public static String addCommand(List<String> cmd, boolean isExecutable)
throws IOException {
- StringBuffer command = new StringBuffer();
+ StringBuilder command = new StringBuilder();
for(String s: cmd) {
command.append('\'');
if (isExecutable) {
@@ -604,11 +613,21 @@ public class TaskLog {
/**
* Get the user log directory for the job jobid.
*
- * @param jobid
+ * @param jobid string representation of the jobid
+ * @return user log directory for the job
+ */
+ public static File getJobDir(String jobid) {
+ return new File(getUserLogDir(), jobid);
+ }
+
+ /**
+ * Get the user log directory for the job jobid.
+ *
+ * @param jobid the jobid object
* @return user log directory for the job
*/
public static File getJobDir(JobID jobid) {
- return new File(getUserLogDir(), jobid.toString());
+ return getJobDir(jobid.toString());
}
} // TaskLog