You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:45:31 UTC
svn commit: r1077143 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapred/
mapred/org/apache/hadoop/mapreduce/security/ test/org/apache/hadoop/mapred/
test/org/apache/hadoop/mapreduce/security/
Author: omalley
Date: Fri Mar 4 03:45:31 2011
New Revision: 1077143
URL: http://svn.apache.org/viewvc?rev=1077143&view=rev
Log:
commit 499fe11038918ad2f8968d3c3b77b96e113b0c4e
Author: Boris Shkolnik <bo...@yahoo-inc.com>
Date: Tue Feb 2 10:31:13 2010 -0800
MAPREDUCE:1432 from https://issues.apache.org/jira/secure/attachment/12434550/MAPREDUCE-1432-BP20-2.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1432. Adds hooks in the jobtracker and tasktracker
+ for loading the tokens in the user's ugi. This is required for
+ the copying of files from the hdfs. (Devaraj Das vi boryas)
+
Removed:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenStorage.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar 4 03:45:31 2011
@@ -30,7 +30,8 @@ import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsUtil;
@@ -66,8 +67,10 @@ class Child {
JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
// file name is passed thru env
- String jobTokenFile = System.getenv().get("JOB_TOKEN_FILE");
- TokenStorage ts = TokenCache.loadTaskTokenStorage(jobTokenFile, defaultConf);
+ String jobTokenFile =
+ System.getenv().get(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+ TokenStorage ts =
+ TokenCache.loadTaskTokenStorage(jobTokenFile, defaultConf);
LOG.debug("loading token. # keys =" +ts.numberOfSecretKeys() +
"; from file=" + jobTokenFile);
@@ -149,9 +152,10 @@ class Child {
JobConf job = new JobConf(task.getJobFile());
// set job shuffle token
- Token<? extends TokenIdentifier> jt = ts.getJobToken();
+ Token<? extends TokenIdentifier> jt = TokenCache.getJobToken(ts);
// set the jobTokenFile into task
- task.setJobTokenSecret(JobTokenSecretManager.createSecretKey(jt.getPassword()));
+ task.setJobTokenSecret(JobTokenSecretManager.
+ createSecretKey(jt.getPassword()));
// setup the child's mapred-local-dir. The child is now sandboxed and
// can only see files down and under attemtdir only.
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Mar 4 03:45:31 2011
@@ -1800,7 +1800,7 @@ public class JobClient extends Configure
mapper.readValue(new File(localFileName), Map.class);
for(Map.Entry<String, String> ent: nm.entrySet()) {
- TokenCache.setSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
+ TokenCache.addSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
}
} catch (JsonMappingException e) {
json_error = true;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 03:45:31 2011
@@ -44,7 +44,7 @@ import org.apache.hadoop.mapred.CleanupQ
import org.apache.hadoop.mapred.JobHistory.Values;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
@@ -54,6 +54,7 @@ import org.apache.hadoop.net.NetworkTopo
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -315,13 +316,20 @@ class JobInProgress {
status.setStartTime(startTime);
this.localFs = jobtracker.getLocalFileSystem();
+ this.tokenStorage = ts;
// use the user supplied token to add user credentials to the conf
jobSubmitDir = jobInfo.getJobSubmitDir();
user = jobInfo.getUser().toString();
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
- fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
- public FileSystem run() throws IOException {
- return jobSubmitDir.getFileSystem(default_conf);
+ if (ts != null) {
+ for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+ ugi.addToken(token);
+ }
+ }
+
+ fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ public FileSystem run() throws IOException {
+ return jobSubmitDir.getFileSystem(default_conf);
}});
this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR
+"/"+jobId + ".xml");
@@ -366,7 +374,6 @@ class JobInProgress {
this.nonRunningReduces = new LinkedList<TaskInProgress>();
this.runningReduces = new LinkedHashSet<TaskInProgress>();
this.resourceEstimator = new ResourceEstimator(this);
- this.tokenStorage = ts;
}
/**
@@ -3104,7 +3111,7 @@ class JobInProgress {
if(tokenStorage == null)
tokenStorage = new TokenStorage();
- tokenStorage.setJobToken(token);
+ TokenCache.setJobToken(token, tokenStorage);
// write TokenStorage out
tokenStorage.write(os);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Mar 4 03:45:31 2011
@@ -21,7 +21,7 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
/**
* Protocol that a JobClient and the central JobTracker use to communicate. The
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:45:31 2011
@@ -101,7 +101,7 @@ import org.apache.hadoop.mapreduce.Clust
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
/*******************************************************
* JobTracker is the central location for submitting and
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar 4 03:45:31 2011
@@ -42,7 +42,7 @@ import org.apache.hadoop.mapreduce.split
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
/** Implements MapReduce locally, in-process, for debugging. */
class LocalJobRunner implements JobSubmissionProtocol {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar 4 03:45:31 2011
@@ -161,7 +161,8 @@ abstract class TaskRunner extends Thread
// We don't create any symlinks yet, so presence/absence of workDir
// actually on the file system doesn't matter.
UserGroupInformation ugi =
- UserGroupInformation.createRemoteUser(conf.getUser());
+ //UserGroupInformation.createRemoteUser(conf.getUser());
+ tracker.getRunningJob(t.getJobID()).getUGI();
ugi.doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws IOException {
taskDistributedCacheManager =
@@ -498,9 +499,9 @@ abstract class TaskRunner extends Thread
}
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
- String jobTokenFile = conf.get(TokenCache.JOB_TOKEN_FILENAME);
+ String jobTokenFile = conf.get(TokenCache.JOB_TOKENS_FILENAME);
LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
- env.put("JOB_TOKEN_FILE", jobTokenFile);
+ env.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, jobTokenFile);
// for the child of task jvm, set hadoop.root.logger
env.put("HADOOP_ROOT_LOGGER","INFO,TLA");
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 03:45:31 2011
@@ -95,6 +95,7 @@ import org.apache.hadoop.util.DiskChecke
import org.apache.hadoop.util.MemoryCalculatorPlugin;
import org.apache.hadoop.util.ProcfsBasedProcessTree;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
@@ -102,7 +103,7 @@ import org.apache.hadoop.util.VersionInf
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
/*******************************************************
* TaskTracker is a process that starts and tracks MR Tasks
@@ -205,6 +206,10 @@ public class TaskTracker
return jobTokenSecretManager;
}
+ RunningJob getRunningJob(JobID jobId) {
+ return runningJobs.get(jobId);
+ }
+
volatile int mapTotal = 0;
volatile int reduceTotal = 0;
boolean justStarted = true;
@@ -543,10 +548,11 @@ public class TaskTracker
return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
}
- private FileSystem getFS(final Path filePath, String user,
+ private FileSystem getFS(final Path filePath, JobID jobId,
final Configuration conf) throws IOException, InterruptedException {
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
- FileSystem userFs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ RunningJob rJob = runningJobs.get(jobId);
+ FileSystem userFs =
+ rJob.ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws IOException {
return filePath.getFileSystem(conf);
}});
@@ -940,7 +946,7 @@ public class TaskTracker
synchronized (rjob) {
if (!rjob.localized) {
- JobConf localJobConf = localizeJobFiles(t);
+ JobConf localJobConf = localizeJobFiles(t, rjob);
// Now initialize the job via task-controller so as to set
// ownership/permissions of jars, job-work-dir. Note that initializeJob
@@ -955,11 +961,6 @@ public class TaskTracker
rjob.jobConf = localJobConf;
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
localJobConf.getKeepFailedTaskFiles());
-
- TokenStorage ts = TokenCache.loadTokens(rjob.jobConf.get(TokenCache.JOB_TOKEN_FILENAME), rjob.jobConf);
-
- Token<JobTokenIdentifier> jt = (Token<JobTokenIdentifier>)ts.getJobToken();
- getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
rjob.localized = true;
}
@@ -983,18 +984,35 @@ public class TaskTracker
* job as a starting point.
* @throws IOException
*/
- JobConf localizeJobFiles(Task t)
+ @SuppressWarnings("unchecked")
+ JobConf localizeJobFiles(Task t, RunningJob rjob)
throws IOException, InterruptedException {
JobID jobId = t.getJobID();
Path jobFile = new Path(t.getJobFile());
String userName = t.getUser();
JobConf userConf = new JobConf(getJobConf());
- FileSystem userFs = getFS(jobFile, userName, userConf);
-
+
// Initialize the job directories first
FileSystem localFs = FileSystem.getLocal(fConf);
getLocalizer().initializeJobDirs(userName, jobId);
+
+ // save local copy of JobToken file
+ String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
+ rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
+
+
+ TokenStorage ts = TokenCache.loadTokens(localJobTokenFile, fConf);
+ Token<JobTokenIdentifier> jt =
+ (Token<JobTokenIdentifier>)TokenCache.getJobToken(ts);
+ if (jt != null) { //could be null in the case of some unit tests
+ getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
+ }
+ for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+ rjob.ugi.addToken(token);
+ }
+
+ FileSystem userFs = getFS(jobFile, jobId, userConf);
// Download the job.xml for this job from the system FS
Path localJobFile =
@@ -1004,6 +1022,11 @@ public class TaskTracker
//WE WILL TRUST THE USERNAME THAT WE GOT FROM THE JOBTRACKER
//AS PART OF THE TASK OBJECT
localJobConf.setUser(userName);
+
+ // set the location of the token file into jobConf to transfer
+ // the name to TaskRunner
+ localJobConf.set(TokenCache.JOB_TOKENS_FILENAME,
+ localJobTokenFile.toString());
// create the 'job-work' directory: job-specific shared directory for use as
// scratch space by all tasks of the same job running on this TaskTracker.
Path workDir =
@@ -1019,9 +1042,6 @@ public class TaskTracker
// Download the job.jar for this job from the system FS
localizeJobJarFile(userName, jobId, userFs, localJobConf);
- // save local copy of JobToken file
- localizeJobTokenFile(userName, jobId, localJobConf);
-
return localJobConf;
}
@@ -3119,6 +3139,7 @@ public class TaskTracker
volatile Set<TaskInProgress> tasks;
boolean localized;
boolean keepJobFiles;
+ UserGroupInformation ugi;
FetchStatus f;
RunningJob(JobID jobid) {
this.jobid = jobid;
@@ -3131,6 +3152,10 @@ public class TaskTracker
return jobid;
}
+ UserGroupInformation getUGI() {
+ return ugi;
+ }
+
void setFetchStatus(FetchStatus f) {
this.f = f;
}
@@ -3701,11 +3726,10 @@ public class TaskTracker
* Download the job-token file from the FS and save on local fs.
* @param user
* @param jobId
- * @param jobConf
* @return the local file system path of the downloaded file.
* @throws IOException
*/
- private void localizeJobTokenFile(String user, JobID jobId, JobConf jobConf)
+ private String localizeJobTokenFile(String user, JobID jobId)
throws IOException {
// check if the tokenJob file is there..
Path skPath = new Path(systemDirectory,
@@ -3720,13 +3744,14 @@ public class TaskTracker
lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user,
jobId.toString()), jobTokenSize, fConf);
- LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() +
- " to " + localJobTokenFile.toUri().getPath());
+ String localJobTokenFileStr = localJobTokenFile.toUri().getPath();
+ if(LOG.isDebugEnabled())
+ LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() +
+ " to " + localJobTokenFileStr);
// Download job_token
systemFS.copyToLocalFile(skPath, localJobTokenFile);
- // set it into jobConf to transfer the name to TaskRunner
- jobConf.set(TokenCache.JOB_TOKEN_FILENAME, localJobTokenFile.toString());
+ return localJobTokenFileStr;
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Mar 4 03:45:31 2011
@@ -19,7 +19,6 @@
package org.apache.hadoop.mapreduce.security;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collection;
@@ -36,27 +35,20 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.TokenStorage;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
/**
- * this class keeps static references to TokenStorage object
- * also it provides auxiliary methods for setting and getting secret keys
+ * This class provides user facing APIs for transferring secrets from
+ * the job client to the tasks.
+ * The secrets can be stored just before submission of jobs and read during
+ * the task execution.
*/
//@InterfaceStability.Evolving
public class TokenCache {
private static final Log LOG = LogFactory.getLog(TokenCache.class);
- /**
- * file name used on HDFS for generated job token
- */
- public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
-
- /**
- * conf setting for job tokens cache file name
- */
-
- public static final String JOB_TOKEN_FILENAME = "mapreduce.job.jobTokenFile";
private static TokenStorage tokenStorage;
@@ -76,7 +68,7 @@ public class TokenCache {
* @param alias
* @param key
*/
- public static void setSecretKey(Text alias, byte[] key) {
+ public static void addSecretKey(Text alias, byte[] key) {
getTokenStorage().addSecretKey(alias, key);
}
@@ -85,17 +77,7 @@ public class TokenCache {
*/
public static void addDelegationToken(
String namenode, Token<? extends TokenIdentifier> t) {
- getTokenStorage().setToken(new Text(namenode), t);
- }
-
- /**
- *
- * @param namenode
- * @return delegation token
- */
- @SuppressWarnings("unchecked")
- public static Token<DelegationTokenIdentifier> getDelegationToken(String namenode) {
- return (Token<DelegationTokenIdentifier>)getTokenStorage().getToken(new Text(namenode));
+ getTokenStorage().addToken(new Text(namenode), t);
}
/**
@@ -107,16 +89,81 @@ public class TokenCache {
}
/**
+ * Convenience method to obtain delegation tokens from namenodes
+ * corresponding to the paths passed.
+ * @param ps array of paths
+ * @param conf configuration
+ * @throws IOException
+ */
+ public static void obtainTokensForNamenodes(Path [] ps, Configuration conf)
+ throws IOException {
+ // get jobtracker principal id (for the renewer)
+ Text jtCreds = new Text(conf.get(JobContext.JOB_JOBTRACKER_ID, ""));
+
+ for(Path p: ps) {
+ FileSystem fs = FileSystem.get(p.toUri(), conf);
+ if(fs instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem)fs;
+ URI uri = fs.getUri();
+ String fs_addr = buildDTServiceName(uri);
+
+ // see if we already have the token
+ Token<DelegationTokenIdentifier> token =
+ TokenCache.getDelegationToken(fs_addr);
+ if(token != null) {
+ LOG.debug("DT for " + token.getService() + " is already present");
+ continue;
+ }
+ // get the token
+ token = dfs.getDelegationToken(jtCreds);
+ if(token==null)
+ throw new IOException("Token from " + fs_addr + " is null");
+
+ token.setService(new Text(fs_addr));
+ TokenCache.addDelegationToken(fs_addr, token);
+ LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr +
+ ";t.service="+token.getService());
+ }
+ }
+ }
+
+ /**
+ * file name used on HDFS for generated job token
+ */
+ //@InterfaceAudience.Private
+ public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
+
+ /**
+ * conf setting for job tokens cache file name
+ */
+ //@InterfaceAudience.Private
+ public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
+ private static final Text JOB_TOKEN = new Text("ShuffleAndJobToken");
+
+ /**
+ *
+ * @param namenode
+ * @return delegation token
+ */
+ @SuppressWarnings("unchecked")
+ //@InterfaceAudience.Private
+ public static Token<DelegationTokenIdentifier>
+ getDelegationToken(String namenode) {
+ return (Token<DelegationTokenIdentifier>)getTokenStorage().
+ getToken(new Text(namenode));
+ }
+
+ /**
* @return TokenStore object
*/
//@InterfaceAudience.Private
public static TokenStorage getTokenStorage() {
if(tokenStorage==null)
tokenStorage = new TokenStorage();
-
+
return tokenStorage;
}
-
+
/**
* sets TokenStorage
* @param ts
@@ -169,6 +216,29 @@ public class TokenCache {
return ts;
}
+ /**
+ * store job token
+ * @param t
+ */
+ //@InterfaceAudience.Private
+ public static void setJobToken(Token<? extends TokenIdentifier> t,
+ TokenStorage ts) {
+ ts.addToken(JOB_TOKEN, t);
+ }
+ /**
+ *
+ * @return job token
+ */
+ //@InterfaceAudience.Private
+ public static Token<? extends TokenIdentifier> getJobToken(TokenStorage ts) {
+ return ts.getToken(JOB_TOKEN);
+ }
+
+ /**
+ * create service name for Delegation token ip:port
+ * @param uri
+ * @return "ip:port"
+ */
static String buildDTServiceName(URI uri) {
int port = uri.getPort();
if(port == -1)
@@ -181,42 +251,4 @@ public class TokenCache {
sb.append(NetUtils.normalizeHostName(uri.getHost())).append(":").append(port);
return sb.toString();
}
-
- /**
- * get Delegation for each distinct dfs for given paths.
- * @param ps
- * @param conf
- * @throws IOException
- */
- public static void obtainTokensForNamenodes(Path [] ps, Configuration conf)
- throws IOException {
- // get jobtracker principal id (for the renewer)
- Text jtCreds = new Text(conf.get(JobContext.JOB_JOBTRACKER_ID, ""));
-
- for(Path p: ps) {
- FileSystem fs = FileSystem.get(p.toUri(), conf);
- if(fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem)fs;
- URI uri = fs.getUri();
- String fs_addr = buildDTServiceName(uri);
-
- // see if we already have the token
- Token<DelegationTokenIdentifier> token =
- TokenCache.getDelegationToken(fs_addr);
- if(token != null) {
- LOG.debug("DT for " + token.getService() + " is already present");
- continue;
- }
- // get the token
- token = dfs.getDelegationToken(jtCreds);
- if(token==null)
- throw new IOException("Token from " + fs_addr + " is null");
-
- token.setService(new Text(fs_addr));
- TokenCache.addDelegationToken(fs_addr, token);
- LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr +
- ";t.service="+token.getService());
- }
- }
- }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar 4 03:45:31 2011
@@ -136,6 +136,11 @@ public class TestTaskTrackerLocalization
// Set up the task to be localized
String jtIdentifier = "200907202331";
jobId = new JobID(jtIdentifier, 1);
+
+ TaskTracker.RunningJob rjob = new TaskTracker.RunningJob(jobId);
+ rjob.ugi = UserGroupInformation.getCurrentUser();
+ tracker.runningJobs.put(jobId, rjob);
+
taskId =
new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0);
task =
@@ -354,7 +359,8 @@ public class TestTaskTrackerLocalization
tracker.getLocalizer().initializeUserDirs(task.getUser());
// /////////// The main method being tested
- localizedJobConf = tracker.localizeJobFiles(task);
+ localizedJobConf = tracker.localizeJobFiles(task,
+ new TaskTracker.RunningJob(task.getJobID()));
// ///////////
// Now initialize the job via task-controller so as to set
@@ -448,7 +454,8 @@ public class TestTaskTrackerLocalization
return;
}
tracker.getLocalizer().initializeUserDirs(task.getUser());
- localizedJobConf = tracker.localizeJobFiles(task);
+ localizedJobConf = tracker.localizeJobFiles(task,
+ new TaskTracker.RunningJob(task.getJobID()));
// Now initialize the job via task-controller so as to set
// ownership/permissions of jars, job-work-dir
@@ -654,7 +661,8 @@ public class TestTaskTrackerLocalization
throws Exception {
// Localize job and localize task.
tracker.getLocalizer().initializeUserDirs(task.getUser());
- localizedJobConf = tracker.localizeJobFiles(task);
+ localizedJobConf = tracker.localizeJobFiles(task,
+ new TaskTracker.RunningJob(task.getJobID()));
if (jvmReuse) {
localizedJobConf.setNumTasksToExecutePerJvm(2);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Mar 4 03:45:31 2011
@@ -49,7 +49,7 @@ import org.apache.hadoop.mapred.OutputCo
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -84,7 +84,7 @@ public class TestTokenCache {
System.out.println("inside MAP: ts==NULL?=" + (ts==null) +
"; #keys = " + (ts==null? 0:ts.numberOfSecretKeys()) +
- ";jobToken = " + (ts==null? "n/a":ts.getJobToken()) +
+ ";jobToken = " + (ts==null? "n/a":TokenCache.getJobToken(ts)) +
"; alias1 key=" + new String(key1) +
"; dts size= " + dts_size);
@@ -257,6 +257,7 @@ public class TestTokenCache {
p1 = fs.makeQualified(p1);
// do not qualify p2
+ TokenCache.setTokenStorage(new TokenStorage());
TokenCache.obtainTokensForNamenodes(new Path [] {p1, p2}, jConf);
// this token is keyed by hostname:port key.