You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:53:46 UTC
svn commit: r1077227 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/
mapred/org/apache/hadoop/mapreduce/security/token/
test/org/apache/hadoop/mapreduce/security/token/
Author: omalley
Date: Fri Mar 4 03:53:46 2011
New Revision: 1077227
URL: http://svn.apache.org/viewvc?rev=1077227&view=rev
Log:
commit 4f96f72d2bcb23da170e72a0bc8adad95f11e24c
Author: Devaraj Das <dd...@yahoo-inc.com>
Date: Thu Feb 25 18:25:29 2010 -0800
MAPREDUCE:1532 from https://issues.apache.org/jira/secure/attachment/12437096/1532-bp20.4.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1532. Ensures all filesystem operations at the client is done
+ as the job submitter. Also, changes the renewal to maintain list of tokens
+ to renew. (ddas)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077227&r1=1077226&r2=1077227&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Mar 4 03:53:46 2011
@@ -401,6 +401,7 @@ public class JobClient extends Configure
private Path stagingAreaDir = null;
private FileSystem fs = null;
+ private UserGroupInformation ugi;
/**
* Create a job client.
@@ -427,6 +428,7 @@ public class JobClient extends Configure
*/
public void init(JobConf conf) throws IOException {
String tracker = conf.get("mapred.job.tracker", "local");
+ this.ugi = UserGroupInformation.getCurrentUser();
if ("local".equals(tracker)) {
conf.setNumMapTasks(1);
this.jobSubmitClient = new LocalJobRunner(conf);
@@ -451,6 +453,7 @@ public class JobClient extends Configure
*/
public JobClient(InetSocketAddress jobTrackAddr,
Configuration conf) throws IOException {
+ this.ugi = UserGroupInformation.getCurrentUser();
jobSubmitClient = createRPCProxy(jobTrackAddr, conf);
}
@@ -468,13 +471,22 @@ public class JobClient extends Configure
* for submission to the MapReduce system.
*
* @return the filesystem handle.
+ * @throws IOException
*/
public synchronized FileSystem getFs() throws IOException {
if (this.fs == null) {
- Path sysDir = getSystemDir();
- this.fs = sysDir.getFileSystem(getConf());
+ try {
+ this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ public FileSystem run() throws IOException {
+ Path sysDir = getSystemDir();
+ return sysDir.getFileSystem(getConf());
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
- return fs;
+ return this.fs;
}
/* see if two file systems are the same or not
@@ -517,8 +529,9 @@ public class JobClient extends Configure
// copies a file to the jobtracker filesystem and returns the path where it
// was copied to
- private Path copyRemoteFiles(FileSystem jtFs, Path parentDir, Path originalPath,
- JobConf job, short replication) throws IOException {
+ private Path copyRemoteFiles(FileSystem jtFs, Path parentDir,
+ final Path originalPath, final JobConf job, short replication)
+ throws IOException, InterruptedException {
//check if we do not need to copy the files
// is jt using the same file system.
// just checking for uri strings... doing no dns lookups
@@ -527,6 +540,7 @@ public class JobClient extends Configure
FileSystem remoteFs = null;
remoteFs = originalPath.getFileSystem(job);
+
if (compareFs(remoteFs, jtFs)) {
return originalPath;
}
@@ -546,7 +560,7 @@ public class JobClient extends Configure
* @throws IOException
*/
private void copyAndConfigureFiles(JobConf job, Path jobSubmitDir)
- throws IOException {
+ throws IOException, InterruptedException {
short replication = (short)job.getInt("mapred.submit.replication", 10);
copyAndConfigureFiles(job, jobSubmitDir, replication);
@@ -557,7 +571,7 @@ public class JobClient extends Configure
}
private void copyAndConfigureFiles(JobConf job, Path submitJobDir,
- short replication) throws IOException {
+ short replication) throws IOException, InterruptedException {
if (!(job.getBoolean("mapred.used.genericoptionsparser", false))) {
LOG.warn("Use GenericOptionsParser for parsing the arguments. " +
@@ -721,7 +735,7 @@ public class JobClient extends Configure
* @throws IOException
*/
public
- RunningJob submitJobInternal(JobConf job
+ RunningJob submitJobInternal(final JobConf job
) throws FileNotFoundException,
ClassNotFoundException,
InterruptedException,
@@ -729,67 +743,80 @@ public class JobClient extends Configure
/*
* configure the command line options correctly on the submitting dfs
*/
- Path jobStagingArea = JobSubmissionFiles.getStagingDir(this, job);
- JobID jobId = jobSubmitClient.getNewJobId();
- Path submitJobDir = new Path(jobStagingArea, jobId.toString());
- job.set("mapreduce.job.dir", submitJobDir.toString());
- JobStatus status = null;
- try {
-
- copyAndConfigureFiles(job, submitJobDir);
-
- // get delegation token for the dir
- TokenCache.obtainTokensForNamenodes(new Path [] {submitJobDir}, job);
-
- Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
- int reduces = job.getNumReduceTasks();
- JobContext context = new JobContext(job, jobId);
-
- job = (JobConf)context.getConfiguration();
+ return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
+ public RunningJob run() throws FileNotFoundException,
+ ClassNotFoundException,
+ InterruptedException,
+ IOException{
+
+ JobConf jobCopy = job;
+ Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
+ jobCopy);
+ JobID jobId = jobSubmitClient.getNewJobId();
+ Path submitJobDir = new Path(jobStagingArea, jobId.toString());
+ jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
+ JobStatus status = null;
+ try {
- // Check the output specification
- if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
- org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
- ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
- output.checkOutputSpecs(context);
- } else {
- job.getOutputFormat().checkOutputSpecs(fs, job);
- }
+ copyAndConfigureFiles(jobCopy, submitJobDir);
+
+ // get delegation token for the dir
+ TokenCache.obtainTokensForNamenodes(new Path [] {submitJobDir},
+ jobCopy);
+
+ Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
+ int reduces = jobCopy.getNumReduceTasks();
+ JobContext context = new JobContext(jobCopy, jobId);
+
+ jobCopy = (JobConf)context.getConfiguration();
+
+ // Check the output specification
+ if (reduces == 0 ? jobCopy.getUseNewMapper() :
+ jobCopy.getUseNewReducer()) {
+ org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
+ ReflectionUtils.newInstance(context.getOutputFormatClass(),
+ jobCopy);
+ output.checkOutputSpecs(context);
+ } else {
+ jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
+ }
- // Create the splits for the job
- LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
- int maps = writeSplits(context, submitJobDir);
- job.setNumMapTasks(maps);
-
- // Write job file to JobTracker's fs
- FSDataOutputStream out =
- FileSystem.create(fs, submitJobFile,
- new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
+ // Create the splits for the job
+ LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
+ int maps = writeSplits(context, submitJobDir);
+ jobCopy.setNumMapTasks(maps);
+
+ // Write job file to JobTracker's fs
+ FSDataOutputStream out =
+ FileSystem.create(fs, submitJobFile,
+ new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
+
+ try {
+ jobCopy.writeXml(out);
+ } finally {
+ out.close();
+ }
- try {
- job.writeXml(out);
- } finally {
- out.close();
- }
-
- //
- // Now, actually submit the job (using the submit name)
- //
- populateTokenCache(job);
- status = jobSubmitClient.submitJob(
- jobId, submitJobDir.toString(), TokenCache.getTokenStorage());
- if (status != null) {
- return new NetworkedJob(status);
- } else {
- throw new IOException("Could not launch job");
- }
- } finally {
- if (status == null) {
- LOG.info("Cleaning up the staging area " + submitJobDir);
- fs.delete(submitJobDir, true);
+ //
+ // Now, actually submit the job (using the submit name)
+ //
+ populateTokenCache(jobCopy);
+ status = jobSubmitClient.submitJob(
+ jobId, submitJobDir.toString(), TokenCache.getTokenStorage());
+ if (status != null) {
+ return new NetworkedJob(status);
+ } else {
+ throw new IOException("Could not launch job");
+ }
+ } finally {
+ if (status == null) {
+ LOG.info("Cleaning up the staging area " + submitJobDir);
+ fs.delete(submitJobDir, true);
+ }
+ }
}
- }
+ });
}
@SuppressWarnings("unchecked")
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077227&r1=1077226&r2=1077227&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:53:46 2011
@@ -102,6 +102,7 @@ import org.apache.hadoop.util.VersionInf
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.security.TokenStorage;
@@ -2446,6 +2447,7 @@ public class JobTracker implements MRCon
ex.printStackTrace();
}
}
+ DelegationTokenRenewal.close();
LOG.info("stopped all jobtracker services");
return;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=1077227&r1=1077226&r2=1077227&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java Fri Mar 4 03:53:46 2011
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -468,9 +469,15 @@ public class Job extends JobContext {
/**
* Open a connection to the JobTracker
* @throws IOException
+ * @throws InterruptedException
*/
- private void connect() throws IOException {
- jobClient = new JobClient((JobConf) getConfiguration());
+ private void connect() throws IOException, InterruptedException {
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ public Object run() throws IOException {
+ jobClient = new JobClient((JobConf) getConfiguration());
+ return null;
+ }
+ });
}
/**
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=1077227&r1=1077226&r2=1077227&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Fri Mar 4 03:53:46 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.hadoop.security.UserGroupInformation;
/**
* A read-only view of the job that is provided to the tasks while they
@@ -59,9 +60,16 @@ public class JobContext {
public static final String JOB_CANCEL_DELEGATION_TOKEN =
"mapreduce.job.complete.cancel.delegation.tokens";
+ protected UserGroupInformation ugi;
+
public JobContext(Configuration conf, JobID jobId) {
this.conf = new org.apache.hadoop.mapred.JobConf(conf);
this.jobId = jobId;
+ try {
+ this.ugi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
/**
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=1077227&r1=1077226&r2=1077227&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java Fri Mar 4 03:53:46 2011
@@ -18,16 +18,15 @@
package org.apache.hadoop.mapreduce.security.token;
+import java.io.IOException;
import java.net.URI;
-import java.security.AccessControlException;
+import org.apache.hadoop.security.AccessControlException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
@@ -80,6 +79,14 @@ public class DelegationTokenRenewal {
public String toString() {
return token + ";exp="+expirationDate;
}
+ @Override
+ public boolean equals (Object obj) {
+ return token.equals(((DelegationTokenToRenew)obj).token);
+ }
+ @Override
+ public int hashCode() {
+ return token.hashCode();
+ }
}
// global single timer (daemon)
@@ -87,19 +94,14 @@ public class DelegationTokenRenewal {
//managing the list of tokens using Map
// jobId=>List<tokens>
- private static Map<JobID, List<DelegationTokenToRenew>> delegationTokens =
- Collections.synchronizedMap(new HashMap<JobID,
- List<DelegationTokenToRenew>>());
+ private static List<DelegationTokenToRenew> delegationTokens =
+ Collections.synchronizedList(new ArrayList<DelegationTokenToRenew>());
//adding token
- private static void addTokenToMap(DelegationTokenToRenew t) {
- // see if a list already exists
- JobID jobId = t.jobId;
- List<DelegationTokenToRenew> l = delegationTokens.get(jobId);
- if(l==null) {
- l = new ArrayList<DelegationTokenToRenew>();
- delegationTokens.put(jobId, l);
- }
- l.add(t);
+ private static void addTokenToList(DelegationTokenToRenew t) {
+ //check to see if the token already exists in the list
+ if (delegationTokens.contains(t))
+ return;
+ delegationTokens.add(t);
}
// kind of tokens we currently renew
@@ -128,7 +130,7 @@ public class DelegationTokenRenewal {
DelegationTokenToRenew dtr =
new DelegationTokenToRenew(jobId, dt, conf, now);
- addTokenToMap(dtr);
+ addTokenToList(dtr);
setTimerForTokenRenewal(dtr, true);
LOG.info("registering token for renewal for service =" + dt.getService()+
@@ -147,15 +149,14 @@ public class DelegationTokenRenewal {
DistributedFileSystem dfs = getDFSForToken(token, conf);
newExpirationDate = dfs.renewDelegationToken(token);
} catch (InvalidToken ite) {
- LOG.warn("token canceled - not scheduling for renew");
- removeFailedDelegationToken(dttr);
- throw new Exception("failed to renew token", ite);
- } catch (AccessControlException ace) {
- LOG.warn("token canceled - not scheduling for renew");
+ LOG.warn("invalid token - not scheduling for renew");
removeFailedDelegationToken(dttr);
- throw new Exception("failed to renew token", ace);
- } catch (Exception ioe) {
+ throw new IOException("failed to renew token", ite);
+ } catch (AccessControlException ioe) {
LOG.warn("failed to renew token:"+token, ioe);
+ removeFailedDelegationToken(dttr);
+ } catch (Exception e) {
+ LOG.warn("failed to renew token:"+token, e);
// returns default expiration date
}
} else {
@@ -266,26 +267,13 @@ public class DelegationTokenRenewal {
*/
private static void removeFailedDelegationToken(DelegationTokenToRenew t) {
JobID jobId = t.jobId;
- List<DelegationTokenToRenew> l = delegationTokens.get(jobId);
- if(l==null) return;
-
- Iterator<DelegationTokenToRenew> it = l.iterator();
- while(it.hasNext()) {
- DelegationTokenToRenew dttr = it.next();
- if(dttr == t) {
- if (LOG.isDebugEnabled())
- LOG.debug("removing failed delegation token for jobid=" + jobId +
- ";t=" + dttr.token.getService());
-
- // cancel the timer
- if(dttr.timerTask!=null)
- dttr.timerTask.cancel();
-
- // no need to cancel the token - it is invalid
- it.remove();
- break; //should be only one
- }
- }
+ if (LOG.isDebugEnabled())
+ LOG.debug("removing failed delegation token for jobid=" + jobId +
+ ";t=" + t.token.getService());
+ delegationTokens.remove(t);
+ // cancel the timer
+ if(t.timerTask!=null)
+ t.timerTask.cancel();
}
/**
@@ -293,24 +281,25 @@ public class DelegationTokenRenewal {
* @param jobId
*/
public static void removeDelegationTokenRenewalForJob(JobID jobId) {
- List<DelegationTokenToRenew> l = delegationTokens.remove(jobId);
- if(l==null) return;
+ synchronized (delegationTokens) {
+ Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
+ while(it.hasNext()) {
+ DelegationTokenToRenew dttr = it.next();
+ if (dttr.jobId.equals(jobId)) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("removing delegation token for jobid=" + jobId +
+ ";t=" + dttr.token.getService());
+
+ // cancel the timer
+ if(dttr.timerTask!=null)
+ dttr.timerTask.cancel();
- Iterator<DelegationTokenToRenew> it = l.iterator();
- while(it.hasNext()) {
- DelegationTokenToRenew dttr = it.next();
- if (LOG.isDebugEnabled())
- LOG.debug("removing delegation token for jobid=" + jobId +
- ";t=" + dttr.token.getService());
-
- // cancel the timer
- if(dttr.timerTask!=null)
- dttr.timerTask.cancel();
+ // cancel the token
+ cancelToken(dttr);
- // cancel the token
- cancelToken(dttr);
-
- it.remove();
+ it.remove();
+ }
+ }
}
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java?rev=1077227&r1=1077226&r2=1077227&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java Fri Mar 4 03:53:46 2011
@@ -233,7 +233,7 @@ public class TestDelegationTokenRenewal
// first 3 initial renewals + 1 real
int numberOfExpectedRenewals = 3+1;
- int attempts = 4;
+ int attempts = 10;
while(attempts-- > 0) {
try {
Thread.sleep(3*1000); // sleep 3 seconds, so it has time to renew
@@ -269,16 +269,10 @@ public class TestDelegationTokenRenewal
JobID jid2 = new JobID("job2",1);
DelegationTokenRenewal.registerDelegationTokensForRenewal(jid2, ts, conf);
DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jid2);
- numberOfExpectedRenewals++; // one more initial renewal
- attempts = 4;
- while(attempts-- > 0) {
- try {
- Thread.sleep(3*1000); // sleep 3 seconds, so it has time to renew
- } catch (InterruptedException e) {}
- // since we cannot guarantee timely execution - let's give few chances
- if(dfs.getCounter()==numberOfExpectedRenewals)
- break;
- }
+ numberOfExpectedRenewals = dfs.getCounter(); // number of renewals so far
+ try {
+ Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
+ } catch (InterruptedException e) {}
System.out.println("Counter = " + dfs.getCounter() + ";t="+dfs.getToken());
// counter and the token should stil be the old ones