You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:45:24 UTC

svn commit: r1077695 - /hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Author: omalley
Date: Fri Mar  4 04:45:23 2011
New Revision: 1077695

URL: http://svn.apache.org/viewvc?rev=1077695&view=rev
Log:
commit b284ae518f10ce86350b93a9fc8ef674329dc95c
Author: Devaraj Das <dd...@yahoo-inc.com>
Date:   Sat Sep 18 10:50:46 2010 -0700

    Fixed an issue in the DistCache setup to have it done within a doAs. Also, made the removal of the jobfiles unconditional. Earlier it used to check whether the job was localized.

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077695&r1=1077694&r2=1077695&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 04:45:23 2011
@@ -980,7 +980,7 @@ public class TaskTracker implements MRCo
    *         of this job as a starting point.
    * @throws IOException
    */
-  Path initializeJob(final Task t, RunningJob rjob)
+  Path initializeJob(final Task t, final RunningJob rjob)
   throws IOException, InterruptedException {
     final JobID jobId = t.getJobID();
 
@@ -1006,47 +1006,42 @@ public class TaskTracker implements MRCo
     // Download the job.xml for this job from the system FS
     final Path localJobFile =
       localizeJobConfFile(new Path(t.getJobFile()), userName, userFs, jobId);
-    JobConf localJobConf = new JobConf(localJobFile);
-
-    // Setup the public distributed cache
-    TaskDistributedCacheManager taskDistributedCacheManager =
-      getTrackerDistributedCacheManager()
-     .newTaskDistributedCacheManager(jobId, localJobConf);
-    rjob.distCacheMgr = taskDistributedCacheManager;
-    taskDistributedCacheManager.setupCache(TaskTracker.getPublicDistributedCacheDir(),
-        TaskTracker.getPrivateDistributedCacheDir(userName));
-
-    // Set some config values
-    localJobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
-        getJobConf().get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
-    if (conf.get("slave.host.name") != null) {
-      localJobConf.set("slave.host.name", conf.get("slave.host.name"));
-    }
-    resetNumTasksPerJvm(localJobConf);
-    localJobConf.setUser(t.getUser());
-
-    // write back the config (this config will have the updates that the
-    // distributed cache manager makes as well)
-    JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
 
     /**
       * Now initialize the job via task-controller to do the rest of the
-      * job-init. Do this within a doAs since the distributed cache is also set
-      * up in {@link TaskController#initializeJob(String, JobID, Path, Path)}
+      * job-init. Do this within a doAs since the public distributed cache 
+      * is also set up here.
       * To support potential authenticated HDFS accesses, we need the tokens
       */
     rjob.ugi.doAs(new PrivilegedExceptionAction<Object>() {
       public Object run() throws IOException {
         try {
+          final JobConf localJobConf = new JobConf(localJobFile);
+          // Setup the public distributed cache
+          TaskDistributedCacheManager taskDistributedCacheManager =
+            getTrackerDistributedCacheManager()
+           .newTaskDistributedCacheManager(jobId, localJobConf);
+          rjob.distCacheMgr = taskDistributedCacheManager;
+          taskDistributedCacheManager.setupCache(TaskTracker.getPublicDistributedCacheDir(),
+              TaskTracker.getPrivateDistributedCacheDir(userName));
+
+          // Set some config values
+          localJobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
+              getJobConf().get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+          if (conf.get("slave.host.name") != null) {
+            localJobConf.set("slave.host.name", conf.get("slave.host.name"));
+          }
+          resetNumTasksPerJvm(localJobConf);
+          localJobConf.setUser(t.getUser());
+
+          // write back the config (this config will have the updates that the
+          // distributed cache manager makes as well)
+          JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
           taskController.initializeJob(t.getUser(), jobId.toString(), 
               new Path(localJobTokenFile), localJobFile, TaskTracker.this);
         } catch (IOException e) {
-          try {
-            // called holding lock on RunningJob
-            removeJobFiles(t.getUser(), jobId);
-          } catch (IOException e2) {
-            LOG.warn("Failed to add " + jobId + " to cleanup queue", e2);
-          }
+          LOG.warn("Exception while localization " + 
+              StringUtils.stringifyException(e));
           throw e;
         }
         return null;
@@ -1727,7 +1722,7 @@ public class TaskTracker implements MRCo
         }
         // Delete the job directory for this  
         // task if the job is done/failed
-        if (!rjob.keepJobFiles && rjob.localized) {
+        if (!rjob.keepJobFiles) {
           removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID());
         }
         // add job to user log manager