You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:43:35 UTC
svn commit: r1077679 [5/6] - in
/hadoop/common/branches/branch-0.20-security-patches: ./
src/c++/task-controller/ src/c++/task-controller/impl/
src/c++/task-controller/test/ src/c++/task-controller/tests/
src/core/org/apache/hadoop/fs/ src/core/org/apa...
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 04:43:33 2011
@@ -55,6 +55,7 @@ import javax.servlet.http.HttpServletRes
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.server.tasktracker.*;
import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
@@ -66,17 +67,15 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapred.QueueManager.QueueACL;
-import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerTaskPathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerJobPathDeletionContext;
+import org.apache.hadoop.mapred.TaskController.DeletionContext;
import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.hadoop.mapred.TaskStatus.Phase;
@@ -157,14 +156,15 @@ public class TaskTracker implements MRCo
public static final Log ClientTraceLog =
LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
- // Job ACLs file is created by TaskTracker under userlogs/$jobid directory for
- // each job at job localization time. This will be used by TaskLogServlet for
- // authorizing viewing of task logs of that job
+ //Job ACLs file is created by TaskController under userlogs/$jobid directory
+ //for each job at job localization time. This will be used by TaskLogServlet
+ //for authorizing viewing of task logs of that job
static String jobACLsFile = "job-acls.xml";
volatile boolean running = true;
private LocalDirAllocator localDirAllocator;
+ private String[] localdirs;
String taskTrackerName;
String localHostname;
InetSocketAddress jobTrackAddr;
@@ -242,9 +242,11 @@ public class TaskTracker implements MRCo
static final String DISTCACHEDIR = "distcache";
static final String JOBCACHE = "jobcache";
static final String OUTPUT = "output";
- private static final String JARSDIR = "jars";
+ static final String JARSDIR = "jars";
static final String LOCAL_SPLIT_FILE = "split.info";
static final String JOBFILE = "job.xml";
+ static final String TT_PRIVATE_DIR = "ttprivate";
+ static final String JVM_EXTRA_ENV_FILE = "jvm.extra.env";
static final String JOB_LOCAL_DIR = "job.local.dir";
static final String JOB_TOKEN_FILE="jobToken"; //localized file
@@ -440,18 +442,28 @@ public class TaskTracker implements MRCo
static String getLocalJobConfFile(String user, String jobid) {
return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
}
+
+ static String getPrivateDirJobConfFile(String user, String jobid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobConfFile(user, jobid);
+ }
static String getTaskConfFile(String user, String jobid, String taskid,
boolean isCleanupAttempt) {
return getLocalTaskDir(user, jobid, taskid, isCleanupAttempt)
+ Path.SEPARATOR + TaskTracker.JOBFILE;
}
+
+ static String getPrivateDirTaskScriptLocation(String user, String jobid,
+ String taskid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR +
+ getLocalTaskDir(user, jobid, taskid);
+ }
static String getJobJarsDir(String user, String jobid) {
return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
}
- static String getJobJarFile(String user, String jobid) {
+ public static String getJobJarFile(String user, String jobid) {
return getJobJarsDir(user, jobid) + Path.SEPARATOR + "job.jar";
}
@@ -470,7 +482,8 @@ public class TaskTracker implements MRCo
+ TaskTracker.OUTPUT;
}
- static String getLocalTaskDir(String user, String jobid, String taskid) {
+ public static String getLocalTaskDir(String user, String jobid,
+ String taskid) {
return getLocalTaskDir(user, jobid, taskid, false);
}
@@ -492,6 +505,15 @@ public class TaskTracker implements MRCo
static String getLocalJobTokenFile(String user, String jobid) {
return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
}
+
+ static String getPrivateDirJobTokenFile(String user, String jobid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR +
+ getLocalJobTokenFile(user, jobid);
+ }
+
+ static String getPrivateDirForJob(String user, String jobid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobDir(user, jobid) ;
+ }
private FileSystem getFS(final Path filePath, JobID jobId,
final Configuration conf) throws IOException, InterruptedException {
@@ -522,6 +544,23 @@ public class TaskTracker implements MRCo
}
}
+ /**
+ * Delete all of the user directories.
+ * @param conf the TT configuration
+ * @throws IOException
+ */
+ private void deleteUserDirectories(Configuration conf) throws IOException {
+ for(String root: localdirs) {
+ for(FileStatus status: localFs.listStatus(new Path(root, SUBDIR))) {
+ String owner = status.getOwner();
+ String path = status.getPath().getName();
+ if (path.endsWith("/" + owner)) {
+ taskController.deleteAsUser(owner, "");
+ }
+ }
+ }
+ }
+
public static final String TT_USER_NAME = "mapreduce.tasktracker.kerberos.principal";
public static final String TT_KEYTAB_FILE =
"mapreduce.tasktracker.keytab.file";
@@ -548,8 +587,18 @@ public class TaskTracker implements MRCo
}
//check local disk
- checkLocalDirs(this.fConf.getLocalDirs());
+ checkLocalDirs((localdirs = this.fConf.getLocalDirs()));
+ deleteUserDirectories(fConf);
fConf.deleteLocalFiles(SUBDIR);
+ final FsPermission ttdir = FsPermission.createImmutable((short) 0755);
+ for (String s : localdirs) {
+ localFs.mkdirs(new Path(s, SUBDIR), ttdir);
+ }
+ fConf.deleteLocalFiles(TT_PRIVATE_DIR);
+ final FsPermission priv = FsPermission.createImmutable((short) 0700);
+ for (String s : localdirs) {
+ localFs.mkdirs(new Path(s, TT_PRIVATE_DIR), priv);
+ }
// Clear out state tables
this.tasks.clear();
@@ -608,14 +657,6 @@ public class TaskTracker implements MRCo
this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
LOG.info("Starting tracker " + taskTrackerName);
- Class<? extends TaskController> taskControllerClass = fConf.getClass(
- "mapred.task.tracker.task-controller", DefaultTaskController.class, TaskController.class);
- taskController = (TaskController) ReflectionUtils.newInstance(
- taskControllerClass, fConf);
-
- // setup and create jobcache directory with appropriate permissions
- taskController.setup();
-
// Initialize DistributedCache
this.distributedCacheManager = new TrackerDistributedCacheManager(
this.fConf, taskController);
@@ -651,7 +692,7 @@ public class TaskTracker implements MRCo
reduceLauncher.start();
// create a localizer instance
- setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController));
+ setLocalizer(new Localizer(localFs, fConf.getLocalDirs()));
//Start up node health checker service.
if (shouldStartHealthMonitor(this.fConf)) {
@@ -897,25 +938,18 @@ public class TaskTracker implements MRCo
JobID jobId = t.getJobID();
RunningJob rjob = addTaskToJob(jobId, tip);
- // Initialize the user directories if needed.
- getLocalizer().initializeUserDirs(t.getUser());
-
synchronized (rjob) {
if (!rjob.localized) {
- JobConf localJobConf = localizeJobFiles(t, rjob);
- // initialize job log directory
- initializeJobLogDir(jobId, localJobConf);
-
- // Now initialize the job via task-controller so as to set
- // ownership/permissions of jars, job-work-dir. Note that initializeJob
- // should be the last call after every other directory/file to be
- // directly under the job directory is created.
- JobInitializationContext context = new JobInitializationContext();
- context.jobid = jobId;
- context.user = t.getUser();
- context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR));
- taskController.initializeJob(context);
-
+ Path localJobConfPath = initializeJob(t, rjob);
+ JobConf localJobConf = new JobConf(localJobConfPath);
+ //to be doubly sure, overwrite the user in the config with the one the TT
+ //thinks it is
+ localJobConf.setUser(t.getUser());
+ //also reset the #tasks per jvm
+ resetNumTasksPerJvm(localJobConf);
+ //set the base jobconf path in rjob; all tasks will use
+ //this as the base path when they run
+ rjob.localizedJobConf = localJobConfPath;
rjob.jobConf = localJobConf;
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
localJobConf.getKeepFailedTaskFiles());
@@ -930,35 +964,31 @@ public class TaskTracker implements MRCo
* Localize the job on this tasktracker. Specifically
* <ul>
* <li>Cleanup and create job directories on all disks</li>
+ * <li>Download the credentials file</li>
* <li>Download the job config file job.xml from the FS</li>
- * <li>Create the job work directory and set {@link TaskTracker#JOB_LOCAL_DIR}
- * in the configuration.
- * <li>Download the job jar file job.jar from the FS, unjar it and set jar
- * file in the configuration.</li>
+ * <li>Invokes the {@link TaskController} to do the rest of the job
+ * initialization</li>
* </ul>
*
* @param t task whose job has to be localized on this TT
- * @return the modified job configuration to be used for all the tasks of this
- * job as a starting point.
+ * @param rjob the {@link RunningJob}
+ * @return the path to the job configuration to be used for all the tasks
+ * of this job as a starting point.
* @throws IOException
*/
- JobConf localizeJobFiles(Task t, RunningJob rjob)
- throws IOException, InterruptedException {
- JobID jobId = t.getJobID();
+ Path initializeJob(final Task t, RunningJob rjob)
+ throws IOException, InterruptedException {
+ final JobID jobId = t.getJobID();
+
+ final Path jobFile = new Path(t.getJobFile());
+ final String userName = t.getUser();
+ final Configuration conf = getJobConf();
- Path jobFile = new Path(t.getJobFile());
- String userName = t.getUser();
- JobConf userConf = new JobConf(getJobConf());
-
- // Initialize the job directories first
- FileSystem localFs = FileSystem.getLocal(fConf);
- getLocalizer().initializeJobDirs(userName, jobId);
-
// save local copy of JobToken file
- String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
+ final String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
-
- Credentials ts = TokenCache.loadTokens(localJobTokenFile, fConf);
+
+ Credentials ts = TokenCache.loadTokens(localJobTokenFile, conf);
Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
if (jt != null) { //could be null in the case of some unit tests
getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
@@ -967,107 +997,108 @@ public class TaskTracker implements MRCo
rjob.ugi.addToken(token);
}
- FileSystem userFs = getFS(jobFile, jobId, userConf);
+ FileSystem userFs = getFS(jobFile, jobId, conf);
// Download the job.xml for this job from the system FS
- Path localJobFile =
- localizeJobConfFile(new Path(t.getJobFile()), userName, userFs, jobId);
-
+ final Path localJobFile =
+ localizeJobConfFile(new Path(t.getJobFile()), userName, userFs, jobId);
JobConf localJobConf = new JobConf(localJobFile);
- //WE WILL TRUST THE USERNAME THAT WE GOT FROM THE JOBTRACKER
- //AS PART OF THE TASK OBJECT
- localJobConf.setUser(userName);
-
- // set the location of the token file into jobConf to transfer
- // the name to TaskRunner
- localJobConf.set(TokenCache.JOB_TOKENS_FILENAME, localJobTokenFile);
- // create the 'job-work' directory: job-specific shared directory for use as
- // scratch space by all tasks of the same job running on this TaskTracker.
- Path workDir =
- lDirAlloc.getLocalPathForWrite(getJobWorkDir(userName,
- jobId.toString()), fConf);
- if (!localFs.mkdirs(workDir)) {
- throw new IOException("Mkdirs failed to create "
- + workDir.toString());
- }
- System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath());
- localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
-
- // Download the job.jar for this job from the system FS
- localizeJobJarFile(userName, jobId, userFs, localJobConf);
-
- return localJobConf;
- }
-
- // Create job userlog dir.
- // Create job acls file in job log dir, if needed.
- void initializeJobLogDir(JobID jobId, JobConf localJobConf)
+
+ // Setup the public distributed cache
+ TaskDistributedCacheManager taskDistributedCacheManager =
+ getTrackerDistributedCacheManager()
+ .newTaskDistributedCacheManager(jobId, localJobConf);
+ rjob.distCacheMgr = taskDistributedCacheManager;
+ taskDistributedCacheManager.setupCache(TaskTracker.getPublicDistributedCacheDir(),
+ TaskTracker.getPrivateDistributedCacheDir(userName));
+
+ // Set some config values
+ localJobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
+ getJobConf().get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+ if (conf.get("slave.host.name") != null) {
+ localJobConf.set("slave.host.name", conf.get("slave.host.name"));
+ }
+ resetNumTasksPerJvm(localJobConf);
+ localJobConf.setUser(t.getUser());
+
+ // write back the config (this config will have the updates that the
+ // distributed cache manager makes as well)
+ JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
+
+ /**
+ * Now initialize the job via task-controller to do the rest of the
+ * job-init. Do this within a doAs since the distributed cache is also set
+ * up in {@link TaskController#initializeJob(String, JobID, Path, Path)}
+ * To support potential authenticated HDFS accesses, we need the tokens
+ */
+ rjob.ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ public Object run() throws IOException {
+ try {
+ taskController.initializeJob(t.getUser(), jobId.toString(),
+ new Path(localJobTokenFile), localJobFile, TaskTracker.this);
+ } catch (IOException e) {
+ try {
+ // called holding lock on RunningJob
+ removeJobFiles(t.getUser(), jobId);
+ } catch (IOException e2) {
+ LOG.warn("Failed to add " + jobId + " to cleanup queue", e2);
+ }
+ throw e;
+ }
+ return null;
+ }
+ });
+ //search for the conf that the initializeJob created
+ //need to look up certain configs from this conf, like
+ //the distributed cache, profiling, etc. ones
+ Path initializedConf = lDirAlloc.getLocalPathToRead(getLocalJobConfFile(
+ userName, jobId.toString()), getJobConf());
+ return initializedConf;
+ }
+
+ /** If certain configs are enabled, the jvm-reuse should be disabled
+ * @param localJobConf
+ */
+ static void resetNumTasksPerJvm(JobConf localJobConf) {
+ boolean debugEnabled = false;
+ if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+ return;
+ }
+ if (localJobConf.getMapDebugScript() != null ||
+ localJobConf.getReduceDebugScript() != null) {
+ debugEnabled = true;
+ }
+ String keepPattern = localJobConf.getKeepTaskFilesPattern();
+
+ if (debugEnabled || localJobConf.getProfileEnabled() ||
+ keepPattern != null || localJobConf.getKeepFailedTaskFiles()) {
+ //disable jvm reuse
+ localJobConf.setNumTasksToExecutePerJvm(1);
+ }
+ }
+
+ // Remove the log dir from the tasklog cleanup thread
+ void saveLogDir(JobID jobId, JobConf localJobConf)
throws IOException {
// remove it from tasklog cleanup thread first,
// it might be added there because of tasktracker reinit or restart
JobStartedEvent jse = new JobStartedEvent(jobId);
getUserLogManager().addLogEvent(jse);
- localizer.initializeJobLogDir(jobId);
-
- if (areACLsEnabled()) {
- // Create job-acls.xml file in job userlog dir and write the needed
- // info for authorization of users for viewing task logs of this job.
- writeJobACLs(localJobConf, TaskLog.getJobDir(jobId));
- }
- }
-
- /**
- * Creates job-acls.xml under the given directory logDir and writes
- * job-view-acl, queue-admins-acl, jobOwner name and queue name into this
- * file.
- * queue name is the queue to which the job was submitted to.
- * queue-admins-acl is the queue admins ACL of the queue to which this
- * job was submitted to.
- * @param conf job configuration
- * @param logDir job userlog dir
- * @throws IOException
- */
- private static void writeJobACLs(JobConf conf, File logDir) throws IOException {
- File aclFile = new File(logDir, jobACLsFile);
- JobConf aclConf = new JobConf(false);
-
- // set the job view acl in aclConf
- String jobViewACL = conf.get(JobContext.JOB_ACL_VIEW_JOB, " ");
- aclConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACL);
-
- // set the job queue name in aclConf
- String queue = conf.getQueueName();
- aclConf.setQueueName(queue);
-
- // set the queue admins acl in aclConf
- String qACLName = QueueManager.toFullPropertyName(queue,
- QueueACL.ADMINISTER_JOBS.getAclName());
- String queueAdminsACL = conf.get(qACLName, " ");
- aclConf.set(qACLName, queueAdminsACL);
-
- // set jobOwner as user.name in aclConf
- String jobOwner = conf.getUser();
- aclConf.set("user.name", jobOwner);
-
- FileOutputStream out = new FileOutputStream(aclFile);
- try {
- aclConf.writeXml(out);
- } finally {
- out.close();
- }
- Localizer.PermissionsHandler.setPermissions(aclFile,
- Localizer.PermissionsHandler.sevenZeroZero);
}
+
/**
* Download the job configuration file from the FS.
*
- * @param t Task whose job file has to be downloaded
- * @param jobId jobid of the task
+ * @param jobFile the original location of the configuration file
+ * @param user the user in question
+ * @param userFs the FileSystem created on behalf of the user
+ * @param jobId jobid in question
* @return the local file system path of the downloaded file.
* @throws IOException
*/
- private Path localizeJobConfFile(Path jobFile, String user, FileSystem userFs, JobID jobId)
+ private Path localizeJobConfFile(Path jobFile, String user,
+ FileSystem userFs, JobID jobId)
throws IOException {
// Get sizes of JobFile and JarFile
// sizes are -1 if they are not present.
@@ -1080,7 +1111,7 @@ public class TaskTracker implements MRCo
jobFileSize = -1;
}
Path localJobFile =
- lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(user,
+ lDirAlloc.getLocalPathForWrite(getPrivateDirJobConfFile(user,
jobId.toString()), jobFileSize, fConf);
// Download job.xml
@@ -1088,58 +1119,16 @@ public class TaskTracker implements MRCo
return localJobFile;
}
- /**
- * Download the job jar file from FS to the local file system and unjar it.
- * Set the local jar file in the passed configuration.
- *
- * @param jobId
- * @param userFs
- * @param localJobConf
- * @throws IOException
- */
- private void localizeJobJarFile(String user, JobID jobId, FileSystem userFs,
- JobConf localJobConf)
- throws IOException {
- // copy Jar file to the local FS and unjar it.
- String jarFile = localJobConf.getJar();
- FileStatus status = null;
- long jarFileSize = -1;
- if (jarFile != null) {
- Path jarFilePath = new Path(jarFile);
- try {
- status = userFs.getFileStatus(jarFilePath);
- jarFileSize = status.getLen();
- } catch (FileNotFoundException fe) {
- jarFileSize = -1;
- }
- // Here we check for five times the size of jarFileSize to accommodate for
- // unjarring the jar file in the jars directory
- Path localJarFile =
- lDirAlloc.getLocalPathForWrite(
- getJobJarFile(user, jobId.toString()), 5 * jarFileSize, fConf);
-
- //Download job.jar
- userFs.copyToLocalFile(jarFilePath, localJarFile);
-
- localJobConf.setJar(localJarFile.toString());
-
- // Also un-jar the job.jar files. We un-jar it so that classes inside
- // sub-directories, for e.g., lib/, classes/ are available on class-path
- RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile
- .getParent().toString()));
- }
- }
-
private void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
- UserGroupInformation ugi) throws IOException {
+ RunningJob rjob) throws IOException {
synchronized (tip) {
tip.setJobConf(jobConf);
- tip.setUGI(ugi);
- tip.launchTask();
+ tip.setUGI(rjob.ugi);
+ tip.launchTask(rjob);
}
}
- public synchronized void shutdown() throws IOException {
+ public synchronized void shutdown() throws IOException, InterruptedException {
shuttingDown = true;
close();
if (this.server != null) {
@@ -1156,8 +1145,9 @@ public class TaskTracker implements MRCo
* any running tasks or threads, and cleanup disk space. A new TaskTracker
* within the same process space might be restarted, so everything must be
* clean.
+ * @throws InterruptedException
*/
- public synchronized void close() throws IOException {
+ public synchronized void close() throws IOException, InterruptedException {
//
// Kill running tasks. Do this in a 2nd vector, called 'tasksToClose',
// because calling jobHasFinished() may result in an edit to 'tasks'.
@@ -1246,8 +1236,15 @@ public class TaskTracker implements MRCo
// objects
FileSystem local = FileSystem.getLocal(conf);
this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
+ Class<? extends TaskController> taskControllerClass =
+ conf.getClass("mapred.task.tracker.task-controller",
+ DefaultTaskController.class, TaskController.class);
+ taskController =
+ (TaskController) ReflectionUtils.newInstance(taskControllerClass, conf);
+ taskController.setup(localDirAllocator);
+
// create user log manager
- setUserLogManager(new UserLogManager(conf));
+ setUserLogManager(new UserLogManager(conf, taskController));
SecurityUtil.login(originalConf, TT_KEYTAB_FILE, TT_USER_NAME);
initialize();
@@ -1688,70 +1685,6 @@ public class TaskTracker implements MRCo
}
/**
- * Builds list of PathDeletionContext objects for the given paths
- */
- private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs,
- Path[] paths) {
- int i = 0;
- PathDeletionContext[] contexts = new PathDeletionContext[paths.length];
-
- for (Path p : paths) {
- contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath());
- }
- return contexts;
- }
-
- /**
- * Builds list of {@link TaskControllerJobPathDeletionContext} objects for a
- * job each pointing to the job's jobLocalDir.
- * @param fs : FileSystem in which the dirs to be deleted
- * @param paths : mapred-local-dirs
- * @param id : {@link JobID} of the job for which the local-dir needs to
- * be cleaned up.
- * @param user : Job owner's username
- * @param taskController : the task-controller to be used for deletion of
- * jobLocalDir
- */
- static PathDeletionContext[] buildTaskControllerJobPathDeletionContexts(
- FileSystem fs, Path[] paths, JobID id, String user,
- TaskController taskController)
- throws IOException {
- int i = 0;
- PathDeletionContext[] contexts =
- new TaskControllerPathDeletionContext[paths.length];
-
- for (Path p : paths) {
- contexts[i++] = new TaskControllerJobPathDeletionContext(fs, p, id, user,
- taskController);
- }
- return contexts;
- }
-
- /**
- * Builds list of TaskControllerTaskPathDeletionContext objects for a task
- * @param fs : FileSystem in which the dirs to be deleted
- * @param paths : mapred-local-dirs
- * @param task : the task whose taskDir or taskWorkDir is going to be deleted
- * @param isWorkDir : the dir to be deleted is workDir or taskDir
- * @param taskController : the task-controller to be used for deletion of
- * taskDir or taskWorkDir
- */
- static PathDeletionContext[] buildTaskControllerTaskPathDeletionContexts(
- FileSystem fs, Path[] paths, Task task, boolean isWorkDir,
- TaskController taskController)
- throws IOException {
- int i = 0;
- PathDeletionContext[] contexts =
- new TaskControllerPathDeletionContext[paths.length];
-
- for (Path p : paths) {
- contexts[i++] = new TaskControllerTaskPathDeletionContext(fs, p, task,
- isWorkDir, taskController);
- }
- return contexts;
- }
-
- /**
* The task tracker is done with this job, so we need to clean up.
* @param action The action with the job
* @throws IOException
@@ -1778,7 +1711,7 @@ public class TaskTracker implements MRCo
}
// Delete the job directory for this
// task if the job is done/failed
- if (!rjob.keepJobFiles) {
+ if (!rjob.keepJobFiles && rjob.localized) {
removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID());
}
// add job to user log manager
@@ -1812,12 +1745,21 @@ public class TaskTracker implements MRCo
* @param rjob
* @throws IOException
*/
- void removeJobFiles(String user, JobID jobId)
- throws IOException {
- PathDeletionContext[] contexts =
- buildTaskControllerJobPathDeletionContexts(localFs,
- getLocalFiles(fConf, ""), jobId, user, taskController);
- directoryCleanupThread.addToQueue(contexts);
+ void removeJobFiles(String user, JobID jobId) throws IOException {
+ String userDir = getUserDir(user);
+ String jobDir = getLocalJobDir(user, jobId.toString());
+ PathDeletionContext jobCleanup =
+ new TaskController.DeletionContext(getTaskController(), false, user,
+ jobDir.substring(userDir.length()));
+ directoryCleanupThread.addToQueue(jobCleanup);
+
+ for (String str : localdirs) {
+ Path ttPrivateJobDir = FileSystem.getLocal(fConf).makeQualified(
+ new Path(str, TaskTracker.getPrivateDirForJob(user, jobId.toString())));
+ PathDeletionContext ttPrivateJobCleanup =
+ new CleanupQueue.PathDeletionContext(ttPrivateJobDir, fConf);
+ directoryCleanupThread.addToQueue(ttPrivateJobCleanup);
+ }
}
/**
@@ -2113,12 +2055,14 @@ public class TaskTracker implements MRCo
* Start a new task.
* All exceptions are handled locally, so that we don't mess up the
* task tracker.
+ * @throws InterruptedException
*/
- void startNewTask(TaskInProgress tip) {
+ void startNewTask(TaskInProgress tip) throws InterruptedException {
try {
RunningJob rjob = localizeJob(tip);
+ tip.getTask().setJobFile(rjob.localizedJobConf.toString());
// Localization is done. Neither rjob.jobConf nor rjob.ugi can be null
- launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob.ugi);
+ launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob);
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() +
":\n" + StringUtils.stringifyException(e));
@@ -2128,8 +2072,9 @@ public class TaskTracker implements MRCo
tip.kill(true);
tip.cleanup(true);
} catch (IOException ie2) {
- LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" +
- StringUtils.stringifyException(ie2));
+ LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
+ } catch (InterruptedException ie2) {
+ LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
}
// Careful!
@@ -2238,7 +2183,7 @@ public class TaskTracker implements MRCo
private TaskRunner runner;
volatile boolean done = false;
volatile boolean wasKilled = false;
- private JobConf defaultJobConf;
+ private JobConf ttConf;
private JobConf localJobConf;
private boolean keepFailedTaskFiles;
private boolean alwaysKeepTaskFiles;
@@ -2270,7 +2215,7 @@ public class TaskTracker implements MRCo
this.task = task;
this.launcher = launcher;
this.lastProgressReport = System.currentTimeMillis();
- this.defaultJobConf = conf;
+ this.ttConf = conf;
localJobConf = null;
taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(),
0.0f,
@@ -2289,69 +2234,10 @@ public class TaskTracker implements MRCo
void localizeTask(Task task) throws IOException{
- FileSystem localFs = FileSystem.getLocal(fConf);
-
- // create taskDirs on all the disks.
- getLocalizer().initializeAttemptDirs(task.getUser(),
- task.getJobID().toString(), task.getTaskID().toString(),
- task.isTaskCleanupTask());
-
- // create the working-directory of the task
- Path cwd =
- lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getUser(), task
- .getJobID().toString(), task.getTaskID().toString(), task
- .isTaskCleanupTask()), defaultJobConf);
- if (!localFs.mkdirs(cwd)) {
- throw new IOException("Mkdirs failed to create "
- + cwd.toString());
- }
-
- localJobConf.set("mapred.local.dir",
- fConf.get("mapred.local.dir"));
-
- if (fConf.get("slave.host.name") != null) {
- localJobConf.set("slave.host.name",
- fConf.get("slave.host.name"));
- }
-
- keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
-
// Do the task-type specific localization
+//TODO: are these calls really required
task.localizeConfiguration(localJobConf);
- List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
- if (staticResolutions != null && staticResolutions.size() > 0) {
- StringBuffer str = new StringBuffer();
-
- for (int i = 0; i < staticResolutions.size(); i++) {
- String[] hostToResolved = staticResolutions.get(i);
- str.append(hostToResolved[0]+"="+hostToResolved[1]);
- if (i != staticResolutions.size() - 1) {
- str.append(',');
- }
- }
- localJobConf.set("hadoop.net.static.resolutions", str.toString());
- }
- if (task.isMapTask()) {
- debugCommand = localJobConf.getMapDebugScript();
- } else {
- debugCommand = localJobConf.getReduceDebugScript();
- }
- String keepPattern = localJobConf.getKeepTaskFilesPattern();
- if (keepPattern != null) {
- alwaysKeepTaskFiles =
- Pattern.matches(keepPattern, task.getTaskID().toString());
- } else {
- alwaysKeepTaskFiles = false;
- }
- if (debugCommand != null || localJobConf.getProfileEnabled() ||
- alwaysKeepTaskFiles || keepFailedTaskFiles) {
- //disable jvm reuse
- localJobConf.setNumTasksToExecutePerJvm(1);
- }
- if (isTaskMemoryManagerEnabled()) {
- localJobConf.setBoolean("task.memory.mgmt.enabled", true);
- }
task.setConf(localJobConf);
}
@@ -2374,6 +2260,18 @@ public class TaskTracker implements MRCo
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
taskTimeout = localJobConf.getLong("mapred.task.timeout",
10 * 60 * 1000);
+ if (task.isMapTask()) {
+ debugCommand = localJobConf.getMapDebugScript();
+ } else {
+ debugCommand = localJobConf.getReduceDebugScript();
+ }
+ String keepPattern = localJobConf.getKeepTaskFilesPattern();
+ if (keepPattern != null) {
+ alwaysKeepTaskFiles =
+ Pattern.matches(keepPattern, task.getTaskID().toString());
+ } else {
+ alwaysKeepTaskFiles = false;
+ }
}
public synchronized JobConf getJobConf() {
@@ -2394,7 +2292,7 @@ public class TaskTracker implements MRCo
/**
* Kick off the task execution
*/
- public synchronized void launchTask() throws IOException {
+ public synchronized void launchTask(RunningJob rjob) throws IOException {
if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
@@ -2402,7 +2300,7 @@ public class TaskTracker implements MRCo
if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
this.taskStatus.setRunState(TaskStatus.State.RUNNING);
}
- setTaskRunner(task.createRunner(TaskTracker.this, this));
+ setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));
this.runner.start();
this.taskStatus.setStartTime(System.currentTimeMillis());
} else {
@@ -2466,8 +2364,10 @@ public class TaskTracker implements MRCo
"Groups=" + taskCounters.getGroupNames().size() + " Limit=" +
Counters.MAX_GROUP_LIMIT);
kill(true);
- } catch(IOException ie) {
- LOG.error("Error killing task " + task.getTaskID(), ie);
+ } catch (IOException e) {
+ LOG.error("Error killing task " + task.getTaskID(), e);
+ } catch (InterruptedException e) {
+ LOG.error("Error killing task " + task.getTaskID(), e);
}
}
@@ -2828,7 +2728,12 @@ public class TaskTracker implements MRCo
getRunState() == TaskStatus.State.UNASSIGNED ||
getRunState() == TaskStatus.State.COMMIT_PENDING ||
isCleaningup()) {
- kill(wasFailure);
+ try {
+ kill(wasFailure);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while killing " +
+ getTask().getTaskID(), e);
+ }
}
}
@@ -2839,8 +2744,10 @@ public class TaskTracker implements MRCo
/**
* Something went wrong and the task must be killed.
* @param wasFailure was it a failure (versus a kill request)?
+ * @throws InterruptedException
*/
- public synchronized void kill(boolean wasFailure) throws IOException {
+ public synchronized void kill(boolean wasFailure
+ ) throws IOException, InterruptedException {
if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
isCleaningup()) {
@@ -2942,7 +2849,7 @@ public class TaskTracker implements MRCo
return;
}
try {
- removeTaskFiles(needCleanup, taskId);
+ removeTaskFiles(needCleanup);
} catch (Throwable ie) {
LOG.info("Error cleaning up task runner: "
+ StringUtils.stringifyException(ie));
@@ -2954,47 +2861,27 @@ public class TaskTracker implements MRCo
* Some or all of the files from this task are no longer required. Remove
* them via CleanupQueue.
*
- * @param needCleanup
+ * @param removeOutputs remove outputs as well as output
* @param taskId
* @throws IOException
*/
- void removeTaskFiles(boolean needCleanup, TaskAttemptID taskId)
- throws IOException {
- if (needCleanup) {
- if (runner != null) {
- // cleans up the output directory of the task (where map outputs
- // and reduce inputs get stored)
- runner.close();
- }
-
- if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- // No jvm reuse, remove everything
- PathDeletionContext[] contexts =
- buildTaskControllerTaskPathDeletionContexts(localFs,
- getLocalFiles(fConf, ""), task, false/* not workDir */,
- taskController);
- directoryCleanupThread.addToQueue(contexts);
+ void removeTaskFiles(boolean removeOutputs) throws IOException {
+ if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+ String user = ugi.getShortUserName();
+ int userDirLen = TaskTracker.getUserDir(user).length();
+ String jobId = task.getJobID().toString();
+ String taskId = task.getTaskID().toString();
+ boolean cleanup = task.isTaskCleanupTask();
+ String taskDir;
+ if (!removeOutputs) {
+ taskDir = TaskTracker.getTaskWorkDir(user, jobId, taskId, cleanup);
} else {
- // Jvm reuse. We don't delete the workdir since some other task
- // (running in the same JVM) might be using the dir. The JVM
- // running the tasks would clean the workdir per a task in the
- // task process itself.
- String localTaskDir =
- getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
- .toString(), task.isTaskCleanupTask());
- PathDeletionContext[] contexts = buildPathDeletionContexts(
- localFs, getLocalFiles(defaultJobConf, localTaskDir +
- Path.SEPARATOR + TaskTracker.JOBFILE));
- directoryCleanupThread.addToQueue(contexts);
- }
- } else {
- if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- PathDeletionContext[] contexts =
- buildTaskControllerTaskPathDeletionContexts(localFs,
- getLocalFiles(fConf, ""), task, true /* workDir */,
- taskController);
- directoryCleanupThread.addToQueue(contexts);
+ taskDir = TaskTracker.getLocalTaskDir(user, jobId, taskId, cleanup);
}
+ PathDeletionContext item =
+ new TaskController.DeletionContext(taskController, false, user,
+ taskDir.substring(userDirLen));
+ directoryCleanupThread.addToQueue(item);
}
}
@@ -3033,7 +2920,11 @@ public class TaskTracker implements MRCo
if (rjob == null) { //kill the JVM since the job is dead
LOG.info("Killing JVM " + jvmId + " since job " + jvmId.getJobId() +
" is dead");
- jvmManager.killJvm(jvmId);
+ try {
+ jvmManager.killJvm(jvmId);
+ } catch (InterruptedException e) {
+ LOG.warn("Failed to kill " + jvmId, e);
+ }
return new JvmTask(null, true);
}
TaskInProgress tip = jvmManager.getTaskForJvm(jvmId);
@@ -3226,12 +3117,15 @@ public class TaskTracker implements MRCo
static class RunningJob{
private JobID jobid;
private JobConf jobConf;
+ private Path localizedJobConf;
// keep this for later use
volatile Set<TaskInProgress> tasks;
boolean localized;
boolean keepJobFiles;
UserGroupInformation ugi;
FetchStatus f;
+ TaskDistributedCacheManager distCacheMgr;
+
RunningJob(JobID jobid) {
this.jobid = jobid;
localized = false;
@@ -3850,7 +3744,7 @@ public class TaskTracker implements MRCo
jobTokenSize = status.getLen();
Path localJobTokenFile =
- lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user,
+ lDirAlloc.getLocalPathForWrite(getPrivateDirJobTokenFile(user,
jobId.toString()), jobTokenSize, fConf);
String localJobTokenFileStr = localJobTokenFile.toUri().getPath();
@@ -3936,4 +3830,13 @@ public class TaskTracker implements MRCo
return map;
}
// End MXBean implemenation
+
+ @Override
+ public void
+ updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
+ long[] sizes
+ ) throws IOException {
+ distributedCacheManager.setArchiveSizes(jobId, sizes);
+ }
+
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Mar 4 04:43:33 2011
@@ -157,4 +157,13 @@ public interface TaskUmbilicalProtocol e
TaskAttemptID id)
throws IOException;
+ /**
+ * The job initializer needs to report the sizes of the archive
+ * objects in the private distributed cache.
+ * @param jobId the job to update
+ * @param sizes the array of sizes that were computed
+ * @throws IOException
+ */
+ void updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
+ long[] sizes) throws IOException;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java Fri Mar 4 04:43:33 2011
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
@@ -118,7 +119,7 @@ public class UserLogCleaner extends Thre
}
public void deleteJobLogs(JobID jobid) throws IOException {
- deleteLogPath(TaskLog.getJobDir(jobid).getAbsolutePath());
+ deleteLogPath(jobid.toString());
}
/**
@@ -134,26 +135,22 @@ public class UserLogCleaner extends Thre
public void clearOldUserLogs(Configuration conf) throws IOException {
File userLogDir = TaskLog.getUserLogDir();
if (userLogDir.exists()) {
- String[] logDirs = userLogDir.list();
- if (logDirs.length > 0) {
+ long now = clock.getTime();
+ for(String logDir: userLogDir.list()) {
// add all the log dirs to taskLogsMnonitor.
- long now = clock.getTime();
- for (String logDir : logDirs) {
- JobID jobid = null;
- try {
- jobid = JobID.forName(logDir);
- } catch (IllegalArgumentException ie) {
- // if the directory is not a jobid, delete it immediately
- deleteLogPath(new File(userLogDir, logDir).getAbsolutePath());
- continue;
- }
- // add the job log directory for deletion with default retain hours,
- // if it is not already added
- if (!completedJobs.containsKey(jobid)) {
- JobCompletedEvent jce = new JobCompletedEvent(jobid, now,
- getUserlogRetainHours(conf));
- userLogManager.addLogEvent(jce);
- }
+ JobID jobid = null;
+ try {
+ jobid = JobID.forName(logDir);
+ } catch (IllegalArgumentException ie) {
+ deleteLogPath(logDir);
+ continue;
+ }
+ // add the job log directory for deletion with default retain hours,
+ // if it is not already added
+ if (!completedJobs.containsKey(jobid)) {
+ JobCompletedEvent jce =
+ new JobCompletedEvent(jobid, now,getUserlogRetainHours(conf));
+ userLogManager.addLogEvent(jce);
}
}
}
@@ -208,7 +205,11 @@ public class UserLogCleaner extends Thre
*/
private void deleteLogPath(String logPath) throws IOException {
LOG.info("Deleting user log path " + logPath);
- PathDeletionContext context = new PathDeletionContext(localFs, logPath);
- cleanupQueue.addToQueue(context);
+ String logRoot = TaskLog.getUserLogDir().toString();
+ String user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
+ TaskController controller = userLogManager.getTaskController();
+ PathDeletionContext item =
+ new TaskController.DeletionContext(controller, true, user, logPath);
+ cleanupQueue.addToQueue(item);
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Mar 4 04:43:33 2011
@@ -156,7 +156,7 @@ public class TokenCache {
* @throws IOException
*/
//@InterfaceAudience.Private
- public static Credentials loadTokens(String jobTokenFile, JobConf conf)
+ public static Credentials loadTokens(String jobTokenFile, Configuration conf)
throws IOException {
Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Fri Mar 4 04:43:33 2011
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskController;
import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskTracker;
-import org.apache.hadoop.mapred.TaskController.InitializationContext;
import org.apache.hadoop.mapreduce.JobID;
/**
@@ -44,19 +43,16 @@ public class Localizer {
private FileSystem fs;
private String[] localDirs;
- private TaskController taskController;
/**
* Create a Localizer instance
*
* @param fileSys
* @param lDirs
- * @param tc
*/
- public Localizer(FileSystem fileSys, String[] lDirs, TaskController tc) {
+ public Localizer(FileSystem fileSys, String[] lDirs) {
fs = fileSys;
localDirs = lDirs;
- taskController = tc;
}
/**
@@ -264,13 +260,6 @@ public class Localizer {
+ user);
}
- // Now, run the task-controller specific code to initialize the
- // user-directories.
- InitializationContext context = new InitializationContext();
- context.user = user;
- context.workDir = null;
- taskController.initializeUser(context);
-
// Localization of the user is done
localizedUser.set(true);
}
@@ -283,7 +272,7 @@ public class Localizer {
* <br>
* Here, we set 700 permissions on the job directories created on all disks.
* This we do so as to avoid any misuse by other users till the time
- * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
+ * {@link TaskController#initializeJob} is run at a
* later time to set proper private permissions on the job directories. <br>
*
* @param user
@@ -331,16 +320,15 @@ public class Localizer {
* @param user
* @param jobId
* @param attemptId
- * @param isCleanupAttempt
* @throws IOException
*/
public void initializeAttemptDirs(String user, String jobId,
- String attemptId, boolean isCleanupAttempt)
+ String attemptId)
throws IOException {
boolean initStatus = false;
String attemptDirPath =
- TaskTracker.getLocalTaskDir(user, jobId, attemptId, isCleanupAttempt);
+ TaskTracker.getLocalTaskDir(user, jobId, attemptId);
for (String localDir : localDirs) {
Path localAttemptDir = new Path(localDir, attemptDirPath);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java Fri Mar 4 04:43:33 2011
@@ -24,9 +24,12 @@ import java.util.concurrent.LinkedBlocki
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.DefaultTaskController;
+import org.apache.hadoop.mapred.TaskController;
import org.apache.hadoop.mapred.TaskLogsTruncater;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapred.UserLogCleaner;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* This manages user logs on the {@link TaskTracker}.
@@ -37,6 +40,7 @@ public class UserLogManager {
new LinkedBlockingQueue<UserLogEvent>();
private TaskLogsTruncater taskLogsTruncater;
private UserLogCleaner userLogCleaner;
+ private final TaskController taskController;
private Thread monitorLogEvents = new Thread() {
@Override
@@ -56,18 +60,50 @@ public class UserLogManager {
*
* It should be explicitly started using {@link #start()} to start functioning
*
- * @param conf
- * The {@link Configuration}
+ * @param conf The {@link Configuration}
+ * @param taskController The task controller to delete the log files
+ *
+ * @throws IOException
+ */
+ public UserLogManager(Configuration conf,
+ TaskController taskController) throws IOException {
+ this.taskController = taskController;
+ setFields(conf);
+ }
+
+ /**
+ * Create the user log manager to manage user logs on {@link TaskTracker}.
+ * This constructor is there mainly for unit tests.
*
+ * @param conf The {@link Configuration}
+ *
* @throws IOException
*/
public UserLogManager(Configuration conf) throws IOException {
+ Class<? extends TaskController> taskControllerClass =
+ conf.getClass("mapred.task.tracker.task-controller",
+ DefaultTaskController.class, TaskController.class);
+ TaskController taskController =
+ (TaskController) ReflectionUtils.newInstance(taskControllerClass, conf);
+ this.taskController = taskController;
+ setFields(conf);
+ }
+
+ private void setFields(Configuration conf) throws IOException {
taskLogsTruncater = new TaskLogsTruncater(conf);
userLogCleaner = new UserLogCleaner(this, conf);
monitorLogEvents.setDaemon(true);
}
/**
+ * Get the taskController for deleting logs.
+ * @return the TaskController
+ */
+ public TaskController getTaskController() {
+ return taskController;
+ }
+
+ /**
* Starts managing the logs
*/
public void start() {
@@ -101,13 +137,12 @@ public class UserLogManager {
* TT's conf
* @throws IOException
*/
- public void clearOldUserLogs(Configuration conf)
- throws IOException {
+ public void clearOldUserLogs(Configuration conf) throws IOException {
userLogCleaner.clearOldUserLogs(conf);
}
private void doJvmFinishedAction(JvmFinishedEvent event) {
- taskLogsTruncater.truncateLogs(event.getJvmInfo());
+ // do nothing
}
private void doJobStartedAction(JobStartedEvent event) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Fri Mar 4 04:43:33 2011
@@ -34,6 +34,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.DefaultTaskController;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.JobLocalizer;
import org.apache.hadoop.mapred.TaskController;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapreduce.Job;
@@ -69,7 +71,9 @@ public class TestTrackerDistributedCache
private static final int LOCAL_CACHE_SUBDIR = 2;
protected Configuration conf;
protected Path firstCacheFile;
+ protected Path firstCacheFilePublic;
protected Path secondCacheFile;
+ protected Path secondCacheFilePublic;
private FileSystem fs;
protected LocalDirAllocator localDirAllocator =
@@ -107,18 +111,22 @@ public class TestTrackerDistributedCache
taskControllerClass, conf);
// setup permissions for mapred local dir
- taskController.setup();
+ taskController.setup(localDirAllocator);
// Create the temporary cache files to be used in the tests.
firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
+ firstCacheFilePublic = new Path(TEST_ROOT_DIR, "firstcachefileOne");
+ secondCacheFilePublic = new Path(TEST_ROOT_DIR, "secondcachefileOne");
+ createPublicTempFile(firstCacheFilePublic);
+ createPublicTempFile(secondCacheFilePublic);
createPrivateTempFile(firstCacheFile);
createPrivateTempFile(secondCacheFile);
}
protected void refreshConf(Configuration conf) throws IOException {
taskController.setConf(conf);
- taskController.setup();
+ taskController.setup(localDirAllocator);
}
/**
@@ -146,6 +154,7 @@ public class TestTrackerDistributedCache
Configuration subConf = new Configuration(conf);
String userName = getJobOwnerName();
subConf.set("user.name", userName);
+ JobID jobid = new JobID("jt",1);
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
DistributedCache.addFileToClassPath(secondCacheFile, subConf,
FileSystem.get(subConf));
@@ -162,13 +171,16 @@ public class TestTrackerDistributedCache
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf, taskController);
TaskDistributedCacheManager handle =
- manager.newTaskDistributedCacheManager(subConf);
+ manager.newTaskDistributedCacheManager(jobid, subConf);
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
- handle.setup(localDirAllocator, workDir, TaskTracker
- .getPrivateDistributedCacheDir(userName),
- TaskTracker.getPublicDistributedCacheDir());
- // ****** End of imitating TaskRunner code
+ handle.setupCache(TaskTracker.getPublicDistributedCacheDir(),
+ TaskTracker.getPrivateDistributedCacheDir(userName));
+ JobLocalizer.downloadPrivateCache(subConf);
+ // DOESN'T ACTUALLY HAPPEN IN THE TaskRunner (THIS IS A TODO)
+// handle.setupPrivateCache(localDirAllocator, TaskTracker
+// .getPrivateDistributedCacheDir(userName));
+// // ****** End of imitating TaskRunner code
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
assertNotNull(null, localCacheFiles);
@@ -201,14 +213,17 @@ public class TestTrackerDistributedCache
}
@Override
- Path localizeCache(Configuration conf, URI cache, long confFileStamp,
- CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive,
- boolean isPublic) throws IOException {
- if (cache.equals(firstCacheFile.toUri())) {
+ Path localizePublicCacheObject(Configuration conf, URI cache,
+ long confFileStamp,
+ CacheStatus cacheStatus,
+ FileStatus fileStatus,
+ boolean isArchive) throws IOException {
+ if (cache.equals(firstCacheFilePublic.toUri())) {
throw new IOException("fake fail");
}
- return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
- fileStatus, isArchive, isPublic);
+ return super.localizePublicCacheObject(conf, cache, confFileStamp,
+ cacheStatus, fileStatus,
+ isArchive);
}
}
@@ -234,10 +249,10 @@ public class TestTrackerDistributedCache
// Task localizing for first job
TaskDistributedCacheManager handle = manager
- .newTaskDistributedCacheManager(conf1);
- handle.setup(localDirAllocator, workDir, TaskTracker
- .getPrivateDistributedCacheDir(userName),
- TaskTracker.getPublicDistributedCacheDir());
+ .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
+ handle.setupCache(TaskTracker.getPublicDistributedCacheDir(),
+ TaskTracker.getPrivateDistributedCacheDir(userName));
+ JobLocalizer.downloadPrivateCache(conf1);
handle.release();
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp,
@@ -252,7 +267,7 @@ public class TestTrackerDistributedCache
Configuration conf2 = job2.getConfiguration();
conf2.set("user.name", userName);
// add a file that would get failed to localize
- DistributedCache.addCacheFile(firstCacheFile.toUri(), conf2);
+ DistributedCache.addCacheFile(firstCacheFilePublic.toUri(), conf2);
// add a file that is already localized by different job
DistributedCache.addCacheFile(secondCacheFile.toUri(), conf2);
// add a file that is never localized
@@ -263,12 +278,12 @@ public class TestTrackerDistributedCache
// Task localizing for second job
// localization for the "firstCacheFile" will fail.
- handle = manager.newTaskDistributedCacheManager(conf2);
+ handle = manager.newTaskDistributedCacheManager(new JobID("jt", 2), conf2);
Throwable th = null;
try {
- handle.setup(localDirAllocator, workDir, TaskTracker
- .getPrivateDistributedCacheDir(userName),
- TaskTracker.getPublicDistributedCacheDir());
+ handle.setupCache(TaskTracker.getPublicDistributedCacheDir(),
+ TaskTracker.getPrivateDistributedCacheDir(userName));
+ JobLocalizer.downloadPrivateCache(conf2);
} catch (IOException e) {
th = e;
LOG.info("Exception during setup", e);
@@ -316,6 +331,26 @@ public class TestTrackerDistributedCache
}
}
+ private void appendBooleanArray(StringBuilder buffer, boolean[] data) {
+ if (data != null && data.length != 0) {
+ buffer.append(data[0]);
+ for(int i=1; i < data.length; i++) {
+ buffer.append(',');
+ buffer.append(data[i]);
+ }
+ }
+ }
+
+ private void appendLongArray(StringBuilder buffer, long[] data) {
+ if (data != null && data.length != 0) {
+ buffer.append(data[0]);
+ for(int i=1; i < data.length; i++) {
+ buffer.append(',');
+ buffer.append(data[i]);
+ }
+ }
+ }
+
private void appendUriArray(StringBuilder buffer, URI[] data) {
if (data != null && data.length != 0) {
buffer.append(data[0]);
@@ -333,15 +368,15 @@ public class TestTrackerDistributedCache
buf.append("\nArchives:");
appendUriArray(buf, DistributedCache.getCacheArchives(conf1));
buf.append("\nFile Visible:");
- appendStringArray(buf, TrackerDistributedCacheManager.getFileVisibilities
+ appendBooleanArray(buf, TrackerDistributedCacheManager.getFileVisibilities
(conf1));
buf.append("\nArchive Visible:");
- appendStringArray(buf, TrackerDistributedCacheManager.getArchiveVisibilities
+ appendBooleanArray(buf, TrackerDistributedCacheManager.getArchiveVisibilities
(conf1));
buf.append("\nFile timestamps:");
- appendStringArray(buf, DistributedCache.getFileTimestamps(conf1));
+ appendLongArray(buf, DistributedCache.getFileTimestamps(conf1));
buf.append("\nArchive timestamps:");
- appendStringArray(buf, DistributedCache.getArchiveTimestamps(conf1));
+ appendLongArray(buf, DistributedCache.getArchiveTimestamps(conf1));
LOG.info("state = " + buf.toString());
}
@@ -367,10 +402,10 @@ public class TestTrackerDistributedCache
// Task localizing for job
TaskDistributedCacheManager handle = manager
- .newTaskDistributedCacheManager(conf1);
- handle.setup(localDirAllocator, workDir, TaskTracker
- .getPrivateDistributedCacheDir(userName),
- TaskTracker.getPublicDistributedCacheDir());
+ .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
+ handle.setupCache(TaskTracker.getPublicDistributedCacheDir(),
+ TaskTracker.getPrivateDistributedCacheDir(userName));
+ JobLocalizer.downloadPrivateCache(conf1);
TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0);
String distCacheDir;
if (visibility) {
@@ -381,8 +416,7 @@ public class TestTrackerDistributedCache
Path localizedPath =
manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
fs.getFileStatus(cacheFile), false,
- c.timestamp, new Path(TEST_ROOT_DIR), false,
- visibility);
+ c.timestamp, visibility);
assertTrue("Cache file didn't get localized in the expected directory. " +
"Expected localization to happen within " +
ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
@@ -494,20 +528,21 @@ public class TestTrackerDistributedCache
conf2.set("user.name", userName);
// We first test the size limit
- Path firstLocalCache = manager.getLocalCache(firstCacheFile.toUri(), conf2,
+ Path firstLocalCache = manager.getLocalCache(firstCacheFilePublic.toUri(), conf2,
TaskTracker.getPrivateDistributedCacheDir(userName),
- fs.getFileStatus(firstCacheFile), false,
- now, new Path(TEST_ROOT_DIR), false, false);
- manager.releaseCache(firstCacheFile.toUri(), conf2, now,
+ fs.getFileStatus(firstCacheFilePublic), false,
+ fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true);
+ manager.releaseCache(firstCacheFilePublic.toUri(), conf2,
+ fs.getFileStatus(firstCacheFilePublic).getModificationTime(),
TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
//in above code,localized a file of size 4K and then release the cache
// which will cause the cache be deleted when the limit goes out.
// The below code localize another cache which's designed to
//sweep away the first cache.
- Path secondLocalCache = manager.getLocalCache(secondCacheFile.toUri(), conf2,
+ Path secondLocalCache = manager.getLocalCache(secondCacheFilePublic.toUri(), conf2,
TaskTracker.getPrivateDistributedCacheDir(userName),
- fs.getFileStatus(secondCacheFile), false,
- System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+ fs.getFileStatus(secondCacheFilePublic), false,
+ fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true);
assertFalse("DistributedCache failed deleting old" +
" cache when the cache store is full.",
localfs.exists(firstLocalCache));
@@ -530,7 +565,6 @@ public class TestTrackerDistributedCache
System.err.println("That directory ends up with "
+ localfs.listStatus(firstCursor).length
+ " subdirectories");
-
Path cachesBase = firstCursor;
assertFalse
@@ -545,21 +579,23 @@ public class TestTrackerDistributedCache
Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
// Adding two more small files, so it triggers the number of sub directory
// limit but does not trigger the file size limit.
- createTempFile(thirdCacheFile, 1);
- createTempFile(fourthCacheFile, 1);
+ createPublicTempFile(thirdCacheFile);
+ createPublicTempFile(fourthCacheFile);
Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(thirdCacheFile), false,
- now, new Path(TEST_ROOT_DIR), false, false);
+ fs.getFileStatus(thirdCacheFile).getModificationTime(),
+ true);
// Release the third cache so that it can be deleted while sweeping
- manager.releaseCache(thirdCacheFile.toUri(), conf2, now,
+ manager.releaseCache(thirdCacheFile.toUri(), conf2,
+ fs.getFileStatus(thirdCacheFile).getModificationTime(),
TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
// Getting the fourth cache will make the number of sub directories becomes
// 3 which is greater than 2. So the released cache will be deleted.
Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(), conf2,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(fourthCacheFile), false,
- System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+ fs.getFileStatus(fourthCacheFile).getModificationTime(), true);
assertFalse("DistributedCache failed deleting old" +
" cache when the cache exceeds the number of sub directories limit.",
localfs.exists(thirdLocalCache));
@@ -590,7 +626,7 @@ public class TestTrackerDistributedCache
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(firstCacheFile), false,
System.currentTimeMillis(),
- new Path(TEST_ROOT_DIR), false, false);
+ false);
assertNotNull("DistributedCache cached file on non-default filesystem.",
result);
}
@@ -625,6 +661,8 @@ public class TestTrackerDistributedCache
protected void tearDown() throws IOException {
new File(firstCacheFile.toString()).delete();
new File(secondCacheFile.toString()).delete();
+ new File(firstCacheFilePublic.toString()).delete();
+ new File(secondCacheFilePublic.toString()).delete();
FileUtil.fullyDelete(new File(TEST_ROOT_DIR));
}
@@ -677,12 +715,14 @@ public class TestTrackerDistributedCache
// ****** Imitate TaskRunner code.
TaskDistributedCacheManager handle =
- manager.newTaskDistributedCacheManager(subConf);
+ manager.newTaskDistributedCacheManager(new JobID("jt", 1), subConf);
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
- handle.setup(localDirAllocator, workDir, TaskTracker
- .getPrivateDistributedCacheDir(userName),
- TaskTracker.getPublicDistributedCacheDir());
+ handle.setupCache(TaskTracker.getPublicDistributedCacheDir(),
+ TaskTracker.getPrivateDistributedCacheDir(userName));
+ //TODO this doesn't really happen in the TaskRunner
+// handle.setupPrivateCache(localDirAllocator, TaskTracker
+// .getPrivateDistributedCacheDir(userName));
// ****** End of imitating TaskRunner code
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
@@ -702,8 +742,10 @@ public class TestTrackerDistributedCache
// running a task of the same job
Throwable th = null;
try {
- handle.setup(localDirAllocator, workDir, TaskTracker
- .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
+ handle.setupCache(TaskTracker.getPublicDistributedCacheDir(),
+ TaskTracker.getPrivateDistributedCacheDir(userName));
+// handle.setupPrivateCache(localDirAllocator, TaskTracker
+// .getPrivateDistributedCacheDir(userName));
} catch (IOException ie) {
th = ie;
}
@@ -721,9 +763,9 @@ public class TestTrackerDistributedCache
TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
handle =
- manager.newTaskDistributedCacheManager(subConf2);
- handle.setup(localDirAllocator, workDir, TaskTracker
- .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
+ manager.newTaskDistributedCacheManager(new JobID("jt", 2), subConf2);
+ handle.setupCache(TaskTracker.getPublicDistributedCacheDir(),
+ TaskTracker.getPrivateDistributedCacheDir(userName));
Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2);
assertNotNull(null, localCacheFiles2);
assertEquals(1, localCacheFiles2.length);
@@ -758,20 +800,20 @@ public class TestTrackerDistributedCache
long now = System.currentTimeMillis();
Path[] localCache = new Path[2];
- localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf,
+ localCache[0] = manager.getLocalCache(firstCacheFilePublic.toUri(), conf,
TaskTracker.getPrivateDistributedCacheDir(userName),
- fs.getFileStatus(firstCacheFile), false,
- now, new Path(TEST_ROOT_DIR), false, false);
+ fs.getFileStatus(firstCacheFilePublic), false,
+ fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true);
FsPermission myPermission = new FsPermission((short)0600);
Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
if (FileSystem.create(localfs, myFile, myPermission) == null) {
throw new IOException("Could not create " + myFile);
}
try {
- localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf,
+ localCache[1] = manager.getLocalCache(secondCacheFilePublic.toUri(), conf,
TaskTracker.getPrivateDistributedCacheDir(userName),
- fs.getFileStatus(secondCacheFile), false,
- System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+ fs.getFileStatus(secondCacheFilePublic), false,
+ fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true);
FileStatus stat = localfs.getFileStatus(myFile);
assertTrue(stat.getPermission().equals(myPermission));
// validate permissions of localized files.
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Fri Mar 4 04:43:33 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.filecache.TestTrackerDistributedCacheManager;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -74,17 +75,16 @@ public class ClusterWithLinuxTaskControl
+ "/task-controller";
@Override
- public void setup() throws IOException {
+ public void setup(LocalDirAllocator allocator) throws IOException {
// get the current ugi and set the task controller group owner
getConf().set(TT_GROUP, taskTrackerSpecialGroup);
// write configuration file
configurationFile = createTaskControllerConf(System
.getProperty(TASKCONTROLLER_PATH), getConf());
- super.setup();
+ super.setup(allocator);
}
- @Override
protected String getTaskControllerExecutablePath() {
return taskControllerExePath;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java Fri Mar 4 04:43:33 2011
@@ -74,7 +74,7 @@ public class TestJobTrackerSafeMode exte
* - check that after all the trackers are recovered, scheduling is opened
*/
private void testSafeMode(MiniDFSCluster dfs, MiniMRCluster mr)
- throws IOException {
+ throws IOException, InterruptedException {
FileSystem fileSys = dfs.getFileSystem();
JobConf jobConf = mr.createJobConf();
String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
@@ -218,8 +218,9 @@ public class TestJobTrackerSafeMode exte
/**
* Test {@link JobTracker}'s safe mode.
+ * @throws InterruptedException
*/
- public void testJobTrackerSafeMode() throws IOException {
+ public void testJobTrackerSafeMode() throws IOException,InterruptedException{
String namenode = null;
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
@@ -278,7 +279,8 @@ public class TestJobTrackerSafeMode exte
}
}
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args)
+ throws IOException, InterruptedException {
new TestJobTrackerSafeMode().testJobTrackerSafeMode();
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJvmManager.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJvmManager.java Fri Mar 4 04:43:33 2011
@@ -24,12 +24,19 @@ import java.io.FileReader;
import java.io.IOException;
import java.util.Vector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JvmManager.JvmManagerForType;
import org.apache.hadoop.mapred.JvmManager.JvmManagerForType.JvmRunner;
+import org.apache.hadoop.mapred.TaskTracker.RunningJob;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogManager;
+import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import static org.junit.Assert.*;
import org.junit.Before;
@@ -43,6 +50,8 @@ public class TestJvmManager {
private TaskTracker tt;
private JvmManager jvmManager;
private JobConf ttConf;
+ private boolean threadCaughtException = false;
+ private String user;
@Before
public void setUp() {
@@ -55,16 +64,24 @@ public class TestJvmManager {
}
public TestJvmManager() throws Exception {
+ user = UserGroupInformation.getCurrentUser().getShortUserName();
tt = new TaskTracker();
ttConf = new JobConf();
ttConf.setLong("mapred.tasktracker.tasks.sleeptime-before-sigkill", 2000);
tt.setConf(ttConf);
tt.setMaxMapSlots(MAP_SLOTS);
tt.setMaxReduceSlots(REDUCE_SLOTS);
- tt.setTaskController(new DefaultTaskController());
+ TaskController dtc;
+ tt.setTaskController((dtc = new DefaultTaskController()));
+ Configuration conf = new Configuration();
+ dtc.setConf(conf);
+ LocalDirAllocator ldirAlloc = new LocalDirAllocator("mapred.local.dir");
+ tt.getTaskController().setup(ldirAlloc);
+ JobID jobId = new JobID("test", 0);
jvmManager = new JvmManager(tt);
tt.setJvmManagerInstance(jvmManager);
tt.setUserLogManager(new UserLogManager(ttConf));
+ tt.setCleanupThread(new InlineCleanupQueue());
}
// write a shell script to execute the command.
@@ -100,15 +117,21 @@ public class TestJvmManager {
JobConf taskConf = new JobConf(ttConf);
TaskAttemptID attemptID = new TaskAttemptID("test", 0, true, 0, 0);
Task task = new MapTask(null, attemptID, 0, null, MAP_SLOTS);
+ task.setUser(user);
task.setConf(taskConf);
TaskInProgress tip = tt.new TaskInProgress(task, taskConf);
File pidFile = new File(TEST_DIR, "pid");
- final TaskRunner taskRunner = task.createRunner(tt, tip);
+ RunningJob rjob = new RunningJob(attemptID.getJobID());
+ TaskController taskController = new DefaultTaskController();
+ taskController.setConf(ttConf);
+ rjob.distCacheMgr =
+ new TrackerDistributedCacheManager(ttConf, taskController).
+ newTaskDistributedCacheManager(attemptID.getJobID(), taskConf);
+ final TaskRunner taskRunner = task.createRunner(tt, tip, rjob);
// launch a jvm which sleeps for 60 seconds
final Vector<String> vargs = new Vector<String>(2);
vargs.add(writeScript("SLEEP", "sleep 60\n", pidFile).getAbsolutePath());
final File workDir = new File(TEST_DIR, "work");
- workDir.mkdir();
final File stdout = new File(TEST_DIR, "stdout");
final File stderr = new File(TEST_DIR, "stderr");
@@ -117,10 +140,13 @@ public class TestJvmManager {
public void run() {
try {
taskRunner.launchJvmAndWait(null, vargs, stdout, stderr, 100,
- workDir, null);
+ workDir);
} catch (InterruptedException e) {
e.printStackTrace();
return;
+ } catch (IOException e) {
+ e.printStackTrace();
+ setThreadCaughtException();
}
}
};
@@ -148,7 +174,14 @@ public class TestJvmManager {
final JvmRunner jvmRunner = mapJvmManager.jvmIdToRunner.get(jvmid);
Thread killer = new Thread() {
public void run() {
- jvmRunner.kill();
+ try {
+ jvmRunner.kill();
+ } catch (IOException e) {
+ e.printStackTrace();
+ setThreadCaughtException();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
};
killer.start();
@@ -164,21 +197,25 @@ public class TestJvmManager {
// launch another jvm and see it finishes properly
attemptID = new TaskAttemptID("test", 0, true, 0, 1);
task = new MapTask(null, attemptID, 0, null, MAP_SLOTS);
+ task.setUser(user);
task.setConf(taskConf);
tip = tt.new TaskInProgress(task, taskConf);
- TaskRunner taskRunner2 = task.createRunner(tt, tip);
+ TaskRunner taskRunner2 = task.createRunner(tt, tip, rjob);
// build dummy vargs to call ls
Vector<String> vargs2 = new Vector<String>(1);
vargs2.add(writeScript("LS", "ls", pidFile).getAbsolutePath());
File workDir2 = new File(TEST_DIR, "work2");
- workDir.mkdir();
File stdout2 = new File(TEST_DIR, "stdout2");
File stderr2 = new File(TEST_DIR, "stderr2");
- taskRunner2.launchJvmAndWait(null, vargs2, stdout2, stderr2, 100, workDir2,
- null);
+ taskRunner2.launchJvmAndWait(null, vargs2, stdout2, stderr2, 100, workDir2);
// join all the threads
killer.join();
jvmRunner.join();
launcher.join();
+ assertFalse("Thread caught unexpected IOException",
+ threadCaughtException);
+ }
+ private void setThreadCaughtException() {
+ threadCaughtException = true;
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java Fri Mar 4 04:43:33 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
import junit.framework.TestCase;
@@ -42,11 +43,6 @@ public class TestLinuxTaskController ext
public static class MyLinuxTaskController extends LinuxTaskController {
String taskControllerExePath = taskControllerPath + "/task-controller";
-
- @Override
- protected String getTaskControllerExecutablePath() {
- return taskControllerExePath;
- }
}
private void validateTaskControllerSetup(TaskController controller,
@@ -55,7 +51,7 @@ public class TestLinuxTaskController ext
// task controller setup should fail validating permissions.
Throwable th = null;
try {
- controller.setup();
+ controller.setup(new LocalDirAllocator("mapred.local.dir"));
} catch (IOException ie) {
th = ie;
}
@@ -64,7 +60,7 @@ public class TestLinuxTaskController ext
+ INVALID_TASKCONTROLLER_PERMISSIONS, th.getMessage().contains(
"with exit code " + INVALID_TASKCONTROLLER_PERMISSIONS));
} else {
- controller.setup();
+ controller.setup(new LocalDirAllocator("mapred.local.dir"));
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Mar 4 04:43:33 2011
@@ -156,7 +156,6 @@ public class TestMiniMRWithDFS extends T
.isDirectory());
LOG.info("Verifying contents of mapred.local.dir "
+ localDir.getAbsolutePath());
-
// Verify contents(user-dir) of tracker-sub-dir
File trackerSubDir = new File(localDir, TaskTracker.SUBDIR);
if (trackerSubDir.isDirectory()) {