You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:45:24 UTC
svn commit: r1077695 -
/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Author: omalley
Date: Fri Mar 4 04:45:23 2011
New Revision: 1077695
URL: http://svn.apache.org/viewvc?rev=1077695&view=rev
Log:
commit b284ae518f10ce86350b93a9fc8ef674329dc95c
Author: Devaraj Das <dd...@yahoo-inc.com>
Date: Sat Sep 18 10:50:46 2010 -0700
Fixed an issue in the DistCache setup to have it done within a doAs. Also, made the removal of the jobfiles unconditional. Earlier it used to check whether the job was localized.
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077695&r1=1077694&r2=1077695&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 04:45:23 2011
@@ -980,7 +980,7 @@ public class TaskTracker implements MRCo
* of this job as a starting point.
* @throws IOException
*/
- Path initializeJob(final Task t, RunningJob rjob)
+ Path initializeJob(final Task t, final RunningJob rjob)
throws IOException, InterruptedException {
final JobID jobId = t.getJobID();
@@ -1006,47 +1006,42 @@ public class TaskTracker implements MRCo
// Download the job.xml for this job from the system FS
final Path localJobFile =
localizeJobConfFile(new Path(t.getJobFile()), userName, userFs, jobId);
- JobConf localJobConf = new JobConf(localJobFile);
-
- // Setup the public distributed cache
- TaskDistributedCacheManager taskDistributedCacheManager =
- getTrackerDistributedCacheManager()
- .newTaskDistributedCacheManager(jobId, localJobConf);
- rjob.distCacheMgr = taskDistributedCacheManager;
- taskDistributedCacheManager.setupCache(TaskTracker.getPublicDistributedCacheDir(),
- TaskTracker.getPrivateDistributedCacheDir(userName));
-
- // Set some config values
- localJobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
- getJobConf().get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
- if (conf.get("slave.host.name") != null) {
- localJobConf.set("slave.host.name", conf.get("slave.host.name"));
- }
- resetNumTasksPerJvm(localJobConf);
- localJobConf.setUser(t.getUser());
-
- // write back the config (this config will have the updates that the
- // distributed cache manager makes as well)
- JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
/**
* Now initialize the job via task-controller to do the rest of the
- * job-init. Do this within a doAs since the distributed cache is also set
- * up in {@link TaskController#initializeJob(String, JobID, Path, Path)}
+ * job-init. Do this within a doAs since the public distributed cache
+ * is also set up here.
* To support potential authenticated HDFS accesses, we need the tokens
*/
rjob.ugi.doAs(new PrivilegedExceptionAction<Object>() {
public Object run() throws IOException {
try {
+ final JobConf localJobConf = new JobConf(localJobFile);
+ // Setup the public distributed cache
+ TaskDistributedCacheManager taskDistributedCacheManager =
+ getTrackerDistributedCacheManager()
+ .newTaskDistributedCacheManager(jobId, localJobConf);
+ rjob.distCacheMgr = taskDistributedCacheManager;
+ taskDistributedCacheManager.setupCache(TaskTracker.getPublicDistributedCacheDir(),
+ TaskTracker.getPrivateDistributedCacheDir(userName));
+
+ // Set some config values
+ localJobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
+ getJobConf().get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+ if (conf.get("slave.host.name") != null) {
+ localJobConf.set("slave.host.name", conf.get("slave.host.name"));
+ }
+ resetNumTasksPerJvm(localJobConf);
+ localJobConf.setUser(t.getUser());
+
+ // write back the config (this config will have the updates that the
+ // distributed cache manager makes as well)
+ JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
taskController.initializeJob(t.getUser(), jobId.toString(),
new Path(localJobTokenFile), localJobFile, TaskTracker.this);
} catch (IOException e) {
- try {
- // called holding lock on RunningJob
- removeJobFiles(t.getUser(), jobId);
- } catch (IOException e2) {
- LOG.warn("Failed to add " + jobId + " to cleanup queue", e2);
- }
+ LOG.warn("Exception while localization " +
+ StringUtils.stringifyException(e));
throw e;
}
return null;
@@ -1727,7 +1722,7 @@ public class TaskTracker implements MRCo
}
// Delete the job directory for this
// task if the job is done/failed
- if (!rjob.keepJobFiles && rjob.localized) {
+ if (!rjob.keepJobFiles) {
removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID());
}
// add job to user log manager