You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:23:43 UTC
svn commit: r1077522 - in
/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred:
JobInProgress.java JobTracker.java
Author: omalley
Date: Fri Mar 4 04:23:43 2011
New Revision: 1077522
URL: http://svn.apache.org/viewvc?rev=1077522&view=rev
Log:
commit 7a90d648e61be0f573c0b1a85bc9276f309f89f7
Author: Devaraj Das <dd...@yahoo-inc.com>
Date: Thu Jul 1 11:35:33 2010 -0700
MAPREDUCE:1900 from https://issues.apache.org/jira/secure/attachment/12448509/fs-close-delta.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1900. Fixes a FS leak that i missed in the earlier patch.
+ (ddas)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077522&r1=1077521&r2=1077522&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 04:23:43 2011
@@ -325,95 +325,103 @@ public class JobInProgress {
JobInProgress(JobTracker jobtracker, final JobConf default_conf,
JobInfo jobInfo, int rCount, Credentials ts)
throws IOException, InterruptedException {
- this.restartCount = rCount;
- this.jobId = JobID.downgrade(jobInfo.getJobID());
- String url = "http://" + jobtracker.getJobTrackerMachine() + ":"
- + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobId;
- this.jobtracker = jobtracker;
- this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
- this.status.setUsername(jobInfo.getUser().toString());
- this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
- this.startTime = jobtracker.getClock().getTime();
- status.setStartTime(startTime);
- this.localFs = jobtracker.getLocalFileSystem();
-
- this.tokenStorage = ts;
- // use the user supplied token to add user credentials to the conf
- jobSubmitDir = jobInfo.getJobSubmitDir();
- user = jobInfo.getUser().toString();
- userUGI = UserGroupInformation.createRemoteUser(user);
- if (ts != null) {
- for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
- userUGI.addToken(token);
- }
+ try {
+ this.restartCount = rCount;
+ this.jobId = JobID.downgrade(jobInfo.getJobID());
+ String url = "http://" + jobtracker.getJobTrackerMachine() + ":"
+ + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobId;
+ this.jobtracker = jobtracker;
+ this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
+ this.status.setUsername(jobInfo.getUser().toString());
+ this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
+ this.startTime = jobtracker.getClock().getTime();
+ status.setStartTime(startTime);
+ this.localFs = jobtracker.getLocalFileSystem();
+
+ this.tokenStorage = ts;
+ // use the user supplied token to add user credentials to the conf
+ jobSubmitDir = jobInfo.getJobSubmitDir();
+ user = jobInfo.getUser().toString();
+ userUGI = UserGroupInformation.createRemoteUser(user);
+ if (ts != null) {
+ for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+ userUGI.addToken(token);
+ }
+ }
+
+ fs = userUGI.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ public FileSystem run() throws IOException {
+ return jobSubmitDir.getFileSystem(default_conf);
+ }});
+ this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR
+ +"/"+jobId + ".xml");
+ Path jobFilePath = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
+ jobFile = jobFilePath.toString();
+ fs.copyToLocalFile(jobFilePath, localJobFile);
+ conf = new JobConf(localJobFile);
+ if (conf.getUser() == null) {
+ this.conf.setUser(user);
+ }
+ if (!conf.getUser().equals(user)) {
+ String desc = "The username " + conf.getUser() + " obtained from the " +
+ "conf doesn't match the username " + user + " the user " +
+ "authenticated as";
+ AuditLogger.logFailure(user, Operation.SUBMIT_JOB.name(), conf.getUser(),
+ jobId.toString(), desc);
+ throw new IOException(desc);
+ }
+ this.priority = conf.getJobPriority();
+ this.status.setJobPriority(this.priority);
+ this.profile = new JobProfile(user, jobId,
+ jobFile, url, conf.getJobName(),
+ conf.getQueueName());
+
+ this.submitHostName = conf.getJobSubmitHostName();
+ this.submitHostAddress = conf.getJobSubmitHostAddress();
+ this.numMapTasks = conf.getNumMapTasks();
+ this.numReduceTasks = conf.getNumReduceTasks();
+
+ this.memoryPerMap = conf.getMemoryForMapTask();
+ this.memoryPerReduce = conf.getMemoryForReduceTask();
+
+ this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
+ (numMapTasks + numReduceTasks + 10);
+
+ // Construct the jobACLs
+ status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
+
+ this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
+ this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
+
+ this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
+
+ MetricsContext metricsContext = MetricsUtil.getContext("mapred");
+ this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
+ this.jobMetrics.setTag("user", conf.getUser());
+ this.jobMetrics.setTag("sessionId", conf.getSessionId());
+ this.jobMetrics.setTag("jobName", conf.getJobName());
+ this.jobMetrics.setTag("jobId", jobId.toString());
+ hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+ hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+ this.maxLevel = jobtracker.getNumTaskCacheLevels();
+ this.anyCacheLevel = this.maxLevel+1;
+ this.nonLocalMaps = new LinkedList<TaskInProgress>();
+ this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+ this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+ this.nonRunningReduces = new LinkedList<TaskInProgress>();
+ this.runningReduces = new LinkedHashSet<TaskInProgress>();
+ this.resourceEstimator = new ResourceEstimator(this);
+
+ // register job's tokens for renewal
+ DelegationTokenRenewal.registerDelegationTokensForRenewal(
+ jobInfo.getJobID(), ts, jobtracker.getConf());
+ } finally {
+ //close all FileSystems that was created above for the current user
+ //At this point, this constructor is called in the context of an RPC, and
+ //hence the "current user" is actually referring to the kerberos
+ //authenticated user (if security is ON).
+ FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
}
-
- fs = userUGI.doAs(new PrivilegedExceptionAction<FileSystem>() {
- public FileSystem run() throws IOException {
- return jobSubmitDir.getFileSystem(default_conf);
- }});
- this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR
- +"/"+jobId + ".xml");
- Path jobFilePath = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
- jobFile = jobFilePath.toString();
- fs.copyToLocalFile(jobFilePath, localJobFile);
- conf = new JobConf(localJobFile);
- if (conf.getUser() == null) {
- this.conf.setUser(user);
- }
- if (!conf.getUser().equals(user)) {
- String desc = "The username " + conf.getUser() + " obtained from the " +
- "conf doesn't match the username " + user + " the user " +
- "authenticated as";
- AuditLogger.logFailure(user, Operation.SUBMIT_JOB.name(), conf.getUser(),
- jobId.toString(), desc);
- throw new IOException(desc);
- }
- this.priority = conf.getJobPriority();
- this.status.setJobPriority(this.priority);
- this.profile = new JobProfile(user, jobId,
- jobFile, url, conf.getJobName(),
- conf.getQueueName());
-
- this.submitHostName = conf.getJobSubmitHostName();
- this.submitHostAddress = conf.getJobSubmitHostAddress();
- this.numMapTasks = conf.getNumMapTasks();
- this.numReduceTasks = conf.getNumReduceTasks();
-
- this.memoryPerMap = conf.getMemoryForMapTask();
- this.memoryPerReduce = conf.getMemoryForReduceTask();
-
- this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
- (numMapTasks + numReduceTasks + 10);
-
- // Construct the jobACLs
- status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
-
- this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
- this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
-
- this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
-
- MetricsContext metricsContext = MetricsUtil.getContext("mapred");
- this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
- this.jobMetrics.setTag("user", conf.getUser());
- this.jobMetrics.setTag("sessionId", conf.getSessionId());
- this.jobMetrics.setTag("jobName", conf.getJobName());
- this.jobMetrics.setTag("jobId", jobId.toString());
- hasSpeculativeMaps = conf.getMapSpeculativeExecution();
- hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
- this.maxLevel = jobtracker.getNumTaskCacheLevels();
- this.anyCacheLevel = this.maxLevel+1;
- this.nonLocalMaps = new LinkedList<TaskInProgress>();
- this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
- this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
- this.nonRunningReduces = new LinkedList<TaskInProgress>();
- this.runningReduces = new LinkedHashSet<TaskInProgress>();
- this.resourceEstimator = new ResourceEstimator(this);
-
- // register job's tokens for renewal
- DelegationTokenRenewal.registerDelegationTokensForRenewal(
- jobInfo.getJobID(), ts, jobtracker.getConf());
}
/**
@@ -2983,7 +2991,7 @@ public class JobInProgress {
//close the user's FS
try {
- FileSystem.closeAllForUGI(userUGI);
+ fs.close();
} catch (IOException ie) {
LOG.warn("Ignoring exception " + StringUtils.stringifyException(ie) +
" while closing FileSystem for " + userUGI);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077522&r1=1077521&r2=1077522&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 04:23:43 2011
@@ -1973,6 +1973,7 @@ public class JobTracker implements MRCon
// Some jobs are stored in a local system directory. We can delete
// the files when we're done with the job.
static final String SUBDIR = "jobTracker";
+ final LocalFileSystem localFs;
FileSystem fs = null;
Path systemDir = null;
JobConf conf;
@@ -2159,6 +2160,7 @@ public class JobTracker implements MRCon
// ... ensure we have the correct info
this.port = interTrackerServer.getListenerAddress().getPort();
this.conf.set("mapred.job.tracker", (this.localMachine + ":" + this.port));
+ this.localFs = FileSystem.getLocal(conf);
LOG.info("JobTracker up at: " + this.port);
this.infoPort = this.infoServer.getPort();
this.conf.set("mapred.job.tracker.http.address",
@@ -2333,7 +2335,7 @@ public class JobTracker implements MRCon
* localizing job files to the local disk.
*/
LocalFileSystem getLocalFileSystem() throws IOException {
- return FileSystem.getLocal(conf);
+ return localFs;
}
public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration conf) {