You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:23:43 UTC

svn commit: r1077522 - in /hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred: JobInProgress.java JobTracker.java

Author: omalley
Date: Fri Mar  4 04:23:43 2011
New Revision: 1077522

URL: http://svn.apache.org/viewvc?rev=1077522&view=rev
Log:
commit 7a90d648e61be0f573c0b1a85bc9276f309f89f7
Author: Devaraj Das <dd...@yahoo-inc.com>
Date:   Thu Jul 1 11:35:33 2010 -0700

    MAPREDUCE:1900 from https://issues.apache.org/jira/secure/attachment/12448509/fs-close-delta.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1900. Fixes a FS leak that i missed in the earlier patch.
    +    (ddas)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077522&r1=1077521&r2=1077522&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 04:23:43 2011
@@ -325,95 +325,103 @@ public class JobInProgress {
   JobInProgress(JobTracker jobtracker, final JobConf default_conf, 
       JobInfo jobInfo, int rCount, Credentials ts) 
   throws IOException, InterruptedException {
-    this.restartCount = rCount;
-    this.jobId = JobID.downgrade(jobInfo.getJobID());
-    String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
-        + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobId;
-    this.jobtracker = jobtracker;
-    this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
-    this.status.setUsername(jobInfo.getUser().toString());
-    this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
-    this.startTime = jobtracker.getClock().getTime();
-    status.setStartTime(startTime);
-    this.localFs = jobtracker.getLocalFileSystem();
-
-    this.tokenStorage = ts;
-    // use the user supplied token to add user credentials to the conf
-    jobSubmitDir = jobInfo.getJobSubmitDir();
-    user = jobInfo.getUser().toString();
-    userUGI = UserGroupInformation.createRemoteUser(user);
-    if (ts != null) {
-      for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
-        userUGI.addToken(token);
-      }
+    try {
+      this.restartCount = rCount;
+      this.jobId = JobID.downgrade(jobInfo.getJobID());
+      String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
+      + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobId;
+      this.jobtracker = jobtracker;
+      this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
+      this.status.setUsername(jobInfo.getUser().toString());
+      this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
+      this.startTime = jobtracker.getClock().getTime();
+      status.setStartTime(startTime);
+      this.localFs = jobtracker.getLocalFileSystem();
+
+      this.tokenStorage = ts;
+      // use the user supplied token to add user credentials to the conf
+      jobSubmitDir = jobInfo.getJobSubmitDir();
+      user = jobInfo.getUser().toString();
+      userUGI = UserGroupInformation.createRemoteUser(user);
+      if (ts != null) {
+        for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+          userUGI.addToken(token);
+        }
+      }
+
+      fs = userUGI.doAs(new PrivilegedExceptionAction<FileSystem>() {
+        public FileSystem run() throws IOException {
+          return jobSubmitDir.getFileSystem(default_conf);
+        }});
+      this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR
+          +"/"+jobId + ".xml");
+      Path jobFilePath = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
+      jobFile = jobFilePath.toString();
+      fs.copyToLocalFile(jobFilePath, localJobFile);
+      conf = new JobConf(localJobFile);
+      if (conf.getUser() == null) {
+        this.conf.setUser(user);
+      }
+      if (!conf.getUser().equals(user)) {
+        String desc = "The username " + conf.getUser() + " obtained from the " +
+        "conf doesn't match the username " + user + " the user " +
+        "authenticated as";
+        AuditLogger.logFailure(user, Operation.SUBMIT_JOB.name(), conf.getUser(), 
+            jobId.toString(), desc);
+        throw new IOException(desc);
+      }
+      this.priority = conf.getJobPriority();
+      this.status.setJobPriority(this.priority);
+      this.profile = new JobProfile(user, jobId, 
+          jobFile, url, conf.getJobName(),
+          conf.getQueueName());
+
+      this.submitHostName = conf.getJobSubmitHostName();
+      this.submitHostAddress = conf.getJobSubmitHostAddress();
+      this.numMapTasks = conf.getNumMapTasks();
+      this.numReduceTasks = conf.getNumReduceTasks();
+
+      this.memoryPerMap = conf.getMemoryForMapTask();
+      this.memoryPerReduce = conf.getMemoryForReduceTask();
+
+      this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
+      (numMapTasks + numReduceTasks + 10);
+
+      // Construct the jobACLs
+      status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
+
+      this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
+      this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
+
+      this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
+
+      MetricsContext metricsContext = MetricsUtil.getContext("mapred");
+      this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
+      this.jobMetrics.setTag("user", conf.getUser());
+      this.jobMetrics.setTag("sessionId", conf.getSessionId());
+      this.jobMetrics.setTag("jobName", conf.getJobName());
+      this.jobMetrics.setTag("jobId", jobId.toString());
+      hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+      hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+      this.maxLevel = jobtracker.getNumTaskCacheLevels();
+      this.anyCacheLevel = this.maxLevel+1;
+      this.nonLocalMaps = new LinkedList<TaskInProgress>();
+      this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+      this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+      this.nonRunningReduces = new LinkedList<TaskInProgress>();    
+      this.runningReduces = new LinkedHashSet<TaskInProgress>();
+      this.resourceEstimator = new ResourceEstimator(this);
+
+      // register job's tokens for renewal
+      DelegationTokenRenewal.registerDelegationTokensForRenewal(
+          jobInfo.getJobID(), ts, jobtracker.getConf());
+    } finally {
+      //close all FileSystems that was created above for the current user
+      //At this point, this constructor is called in the context of an RPC, and
+      //hence the "current user" is actually referring to the kerberos
+      //authenticated user (if security is ON).
+      FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
     }
-
-    fs = userUGI.doAs(new PrivilegedExceptionAction<FileSystem>() {
-      public FileSystem run() throws IOException {
-        return jobSubmitDir.getFileSystem(default_conf);
-      }});
-    this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR
-        +"/"+jobId + ".xml");
-    Path jobFilePath = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
-    jobFile = jobFilePath.toString();
-    fs.copyToLocalFile(jobFilePath, localJobFile);
-    conf = new JobConf(localJobFile);
-    if (conf.getUser() == null) {
-      this.conf.setUser(user);
-    }
-    if (!conf.getUser().equals(user)) {
-      String desc = "The username " + conf.getUser() + " obtained from the " +
-      		"conf doesn't match the username " + user + " the user " +
-      				"authenticated as";
-      AuditLogger.logFailure(user, Operation.SUBMIT_JOB.name(), conf.getUser(), 
-          jobId.toString(), desc);
-      throw new IOException(desc);
-    }
-    this.priority = conf.getJobPriority();
-    this.status.setJobPriority(this.priority);
-    this.profile = new JobProfile(user, jobId, 
-                                  jobFile, url, conf.getJobName(),
-                                  conf.getQueueName());
-
-    this.submitHostName = conf.getJobSubmitHostName();
-    this.submitHostAddress = conf.getJobSubmitHostAddress();
-    this.numMapTasks = conf.getNumMapTasks();
-    this.numReduceTasks = conf.getNumReduceTasks();
-    
-    this.memoryPerMap = conf.getMemoryForMapTask();
-    this.memoryPerReduce = conf.getMemoryForReduceTask();
-    
-    this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
-       (numMapTasks + numReduceTasks + 10);
-
-    // Construct the jobACLs
-    status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
-
-    this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
-    this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
-        
-    this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
-    
-    MetricsContext metricsContext = MetricsUtil.getContext("mapred");
-    this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
-    this.jobMetrics.setTag("user", conf.getUser());
-    this.jobMetrics.setTag("sessionId", conf.getSessionId());
-    this.jobMetrics.setTag("jobName", conf.getJobName());
-    this.jobMetrics.setTag("jobId", jobId.toString());
-    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
-    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
-    this.maxLevel = jobtracker.getNumTaskCacheLevels();
-    this.anyCacheLevel = this.maxLevel+1;
-    this.nonLocalMaps = new LinkedList<TaskInProgress>();
-    this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
-    this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
-    this.nonRunningReduces = new LinkedList<TaskInProgress>();    
-    this.runningReduces = new LinkedHashSet<TaskInProgress>();
-    this.resourceEstimator = new ResourceEstimator(this);
-    
-    // register job's tokens for renewal
-    DelegationTokenRenewal.registerDelegationTokensForRenewal(
-         jobInfo.getJobID(), ts, jobtracker.getConf());
   }
 
   /**
@@ -2983,7 +2991,7 @@ public class JobInProgress {
 
     //close the user's FS
     try {
-      FileSystem.closeAllForUGI(userUGI);
+      fs.close();
     } catch (IOException ie) {
       LOG.warn("Ignoring exception " + StringUtils.stringifyException(ie) + 
           " while closing FileSystem for " + userUGI);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077522&r1=1077521&r2=1077522&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 04:23:43 2011
@@ -1973,6 +1973,7 @@ public class JobTracker implements MRCon
   // Some jobs are stored in a local system directory.  We can delete
   // the files when we're done with the job.
   static final String SUBDIR = "jobTracker";
+  final LocalFileSystem localFs;
   FileSystem fs = null;
   Path systemDir = null;
   JobConf conf;
@@ -2159,6 +2160,7 @@ public class JobTracker implements MRCon
     // ... ensure we have the correct info
     this.port = interTrackerServer.getListenerAddress().getPort();
     this.conf.set("mapred.job.tracker", (this.localMachine + ":" + this.port));
+    this.localFs = FileSystem.getLocal(conf);
     LOG.info("JobTracker up at: " + this.port);
     this.infoPort = this.infoServer.getPort();
     this.conf.set("mapred.job.tracker.http.address", 
@@ -2333,7 +2335,7 @@ public class JobTracker implements MRCon
    * localizing job files to the local disk.
    */
   LocalFileSystem getLocalFileSystem() throws IOException {
-    return FileSystem.getLocal(conf);
+    return localFs;
   }
 
   public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration conf) {