You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:45:31 UTC

svn commit: r1077143 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/security/ test/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapreduce/security/

Author: omalley
Date: Fri Mar  4 03:45:31 2011
New Revision: 1077143

URL: http://svn.apache.org/viewvc?rev=1077143&view=rev
Log:
commit 499fe11038918ad2f8968d3c3b77b96e113b0c4e
Author: Boris Shkolnik <bo...@yahoo-inc.com>
Date:   Tue Feb 2 10:31:13 2010 -0800

    MAPREDUCE:1432 from https://issues.apache.org/jira/secure/attachment/12434550/MAPREDUCE-1432-BP20-2.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1432. Adds hooks in the jobtracker and tasktracker
    +    for loading the tokens in the user's ugi. This is required for
    +    the copying of files from the hdfs. (Devaraj Das vi boryas)
    +

Removed:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenStorage.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar  4 03:45:31 2011
@@ -30,7 +30,8 @@ import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -66,8 +67,10 @@ class Child {
     JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
 
     // file name is passed thru env
-    String jobTokenFile = System.getenv().get("JOB_TOKEN_FILE");
-    TokenStorage ts = TokenCache.loadTaskTokenStorage(jobTokenFile, defaultConf);
+    String jobTokenFile = 
+      System.getenv().get(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+    TokenStorage ts = 
+      TokenCache.loadTaskTokenStorage(jobTokenFile, defaultConf);
     LOG.debug("loading token. # keys =" +ts.numberOfSecretKeys() + 
         "; from file=" + jobTokenFile);
 
@@ -149,9 +152,10 @@ class Child {
         JobConf job = new JobConf(task.getJobFile());
 
         // set job shuffle token
-        Token<? extends TokenIdentifier> jt = ts.getJobToken();
+        Token<? extends TokenIdentifier> jt = TokenCache.getJobToken(ts);
         // set the jobTokenFile into task
-        task.setJobTokenSecret(JobTokenSecretManager.createSecretKey(jt.getPassword()));
+        task.setJobTokenSecret(JobTokenSecretManager.
+            createSecretKey(jt.getPassword()));
 
         // setup the child's mapred-local-dir. The child is now sandboxed and
         // can only see files down and under attemtdir only.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Mar  4 03:45:31 2011
@@ -1800,7 +1800,7 @@ public class JobClient extends Configure
           mapper.readValue(new File(localFileName), Map.class);
 
         for(Map.Entry<String, String> ent: nm.entrySet()) {
-          TokenCache.setSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
+          TokenCache.addSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
         }
       } catch (JsonMappingException e) {
         json_error = true;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 03:45:31 2011
@@ -44,7 +44,7 @@ import org.apache.hadoop.mapred.CleanupQ
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -54,6 +54,7 @@ import org.apache.hadoop.net.NetworkTopo
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -315,13 +316,20 @@ class JobInProgress {
     status.setStartTime(startTime);
     this.localFs = jobtracker.getLocalFileSystem();
 
+    this.tokenStorage = ts;
     // use the user supplied token to add user credentials to the conf
     jobSubmitDir = jobInfo.getJobSubmitDir();
     user = jobInfo.getUser().toString();
     UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
-      fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
-        public FileSystem run() throws IOException {
-          return jobSubmitDir.getFileSystem(default_conf);
+    if (ts != null) {
+      for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+        ugi.addToken(token);
+      }
+    }
+
+    fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+      public FileSystem run() throws IOException {
+        return jobSubmitDir.getFileSystem(default_conf);
       }});
     this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR
         +"/"+jobId + ".xml");
@@ -366,7 +374,6 @@ class JobInProgress {
     this.nonRunningReduces = new LinkedList<TaskInProgress>();    
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
     this.resourceEstimator = new ResourceEstimator(this);
-    this.tokenStorage = ts;
   }
 
   /**
@@ -3104,7 +3111,7 @@ class JobInProgress {
     if(tokenStorage == null)
       tokenStorage = new TokenStorage();
 
-    tokenStorage.setJobToken(token);
+    TokenCache.setJobToken(token, tokenStorage);
         
     // write TokenStorage out
     tokenStorage.write(os);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Mar  4 03:45:31 2011
@@ -21,7 +21,7 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 
 import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
 
 /** 
  * Protocol that a JobClient and the central JobTracker use to communicate.  The

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:45:31 2011
@@ -101,7 +101,7 @@ import org.apache.hadoop.mapreduce.Clust
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
 
 /*******************************************************
  * JobTracker is the central location for submitting and 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar  4 03:45:31 2011
@@ -42,7 +42,7 @@ import org.apache.hadoop.mapreduce.split
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
 class LocalJobRunner implements JobSubmissionProtocol {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar  4 03:45:31 2011
@@ -161,7 +161,8 @@ abstract class TaskRunner extends Thread
       // We don't create any symlinks yet, so presence/absence of workDir
       // actually on the file system doesn't matter.
       UserGroupInformation ugi =
-        UserGroupInformation.createRemoteUser(conf.getUser());
+        //UserGroupInformation.createRemoteUser(conf.getUser());
+        tracker.getRunningJob(t.getJobID()).getUGI();
       ugi.doAs(new PrivilegedExceptionAction<Void>() {
         public Void run() throws IOException {
           taskDistributedCacheManager =
@@ -498,9 +499,9 @@ abstract class TaskRunner extends Thread
     }
     env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
 
-    String jobTokenFile = conf.get(TokenCache.JOB_TOKEN_FILENAME);
+    String jobTokenFile = conf.get(TokenCache.JOB_TOKENS_FILENAME);
     LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
-    env.put("JOB_TOKEN_FILE", jobTokenFile);
+    env.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, jobTokenFile);
     
     // for the child of task jvm, set hadoop.root.logger
     env.put("HADOOP_ROOT_LOGGER","INFO,TLA");

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 03:45:31 2011
@@ -95,6 +95,7 @@ import org.apache.hadoop.util.DiskChecke
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
@@ -102,7 +103,7 @@ import org.apache.hadoop.util.VersionInf
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
 
 /*******************************************************
  * TaskTracker is a process that starts and tracks MR Tasks
@@ -205,6 +206,10 @@ public class TaskTracker 
     return jobTokenSecretManager;
   }
 
+  RunningJob getRunningJob(JobID jobId) {
+    return runningJobs.get(jobId);
+  }
+
   volatile int mapTotal = 0;
   volatile int reduceTotal = 0;
   boolean justStarted = true;
@@ -543,10 +548,11 @@ public class TaskTracker 
     return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
   }
 
-  private FileSystem getFS(final Path filePath, String user,
+  private FileSystem getFS(final Path filePath, JobID jobId,
       final Configuration conf) throws IOException, InterruptedException {
-    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
-    FileSystem userFs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+    RunningJob rJob = runningJobs.get(jobId);
+    FileSystem userFs = 
+      rJob.ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
         public FileSystem run() throws IOException {
           return filePath.getFileSystem(conf);
       }});
@@ -940,7 +946,7 @@ public class TaskTracker 
 
     synchronized (rjob) {
       if (!rjob.localized) {
-        JobConf localJobConf = localizeJobFiles(t);
+        JobConf localJobConf = localizeJobFiles(t, rjob);
         
         // Now initialize the job via task-controller so as to set
         // ownership/permissions of jars, job-work-dir. Note that initializeJob
@@ -955,11 +961,6 @@ public class TaskTracker 
         rjob.jobConf = localJobConf;  
         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                              localJobConf.getKeepFailedTaskFiles());
-
-        TokenStorage ts = TokenCache.loadTokens(rjob.jobConf.get(TokenCache.JOB_TOKEN_FILENAME), rjob.jobConf);
-
-        Token<JobTokenIdentifier> jt = (Token<JobTokenIdentifier>)ts.getJobToken(); 
-        getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
  
         rjob.localized = true;
       }
@@ -983,18 +984,35 @@ public class TaskTracker 
    *         job as a starting point.
    * @throws IOException
    */
-  JobConf localizeJobFiles(Task t)
+  @SuppressWarnings("unchecked")
+  JobConf localizeJobFiles(Task t, RunningJob rjob)
       throws IOException, InterruptedException {
     JobID jobId = t.getJobID();
 
     Path jobFile = new Path(t.getJobFile());
     String userName = t.getUser();
     JobConf userConf = new JobConf(getJobConf());
-    FileSystem userFs = getFS(jobFile, userName, userConf);
-
+    
     // Initialize the job directories first
     FileSystem localFs = FileSystem.getLocal(fConf);
     getLocalizer().initializeJobDirs(userName, jobId);
+    
+    // save local copy of JobToken file
+    String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
+    rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
+    
+    
+    TokenStorage ts = TokenCache.loadTokens(localJobTokenFile, fConf);
+    Token<JobTokenIdentifier> jt = 
+      (Token<JobTokenIdentifier>)TokenCache.getJobToken(ts);
+    if (jt != null) { //could be null in the case of some unit tests
+      getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
+    }
+    for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+      rjob.ugi.addToken(token);
+    }
+
+    FileSystem userFs = getFS(jobFile, jobId, userConf);
 
     // Download the job.xml for this job from the system FS
     Path localJobFile =
@@ -1004,6 +1022,11 @@ public class TaskTracker 
     //WE WILL TRUST THE USERNAME THAT WE GOT FROM THE JOBTRACKER
     //AS PART OF THE TASK OBJECT
     localJobConf.setUser(userName);
+    
+    // set the location of the token file into jobConf to transfer 
+    // the name to TaskRunner
+    localJobConf.set(TokenCache.JOB_TOKENS_FILENAME,
+        localJobTokenFile.toString());
     // create the 'job-work' directory: job-specific shared directory for use as
     // scratch space by all tasks of the same job running on this TaskTracker.
     Path workDir =
@@ -1019,9 +1042,6 @@ public class TaskTracker 
     // Download the job.jar for this job from the system FS
     localizeJobJarFile(userName, jobId, userFs, localJobConf);
 
-    // save local copy of JobToken file
-    localizeJobTokenFile(userName, jobId, localJobConf);
-
     return localJobConf;
   }
 
@@ -3119,6 +3139,7 @@ public class TaskTracker 
     volatile Set<TaskInProgress> tasks;
     boolean localized;
     boolean keepJobFiles;
+    UserGroupInformation ugi;
     FetchStatus f;
     RunningJob(JobID jobid) {
       this.jobid = jobid;
@@ -3131,6 +3152,10 @@ public class TaskTracker 
       return jobid;
     }
       
+    UserGroupInformation getUGI() {
+      return ugi;
+    }
+
     void setFetchStatus(FetchStatus f) {
       this.f = f;
     }
@@ -3701,11 +3726,10 @@ public class TaskTracker 
      * Download the job-token file from the FS and save on local fs.
      * @param user
      * @param jobId
-     * @param jobConf
      * @return the local file system path of the downloaded file.
      * @throws IOException
      */
-    private void localizeJobTokenFile(String user, JobID jobId, JobConf jobConf)
+  private String localizeJobTokenFile(String user, JobID jobId)
         throws IOException {
       // check if the tokenJob file is there..
       Path skPath = new Path(systemDirectory, 
@@ -3720,13 +3744,14 @@ public class TaskTracker 
           lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user, 
               jobId.toString()), jobTokenSize, fConf);
     
-      LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() + 
-          " to " + localJobTokenFile.toUri().getPath());
+      String localJobTokenFileStr = localJobTokenFile.toUri().getPath();
+      if(LOG.isDebugEnabled())
+        LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() + 
+            " to " + localJobTokenFileStr);
       
       // Download job_token
       systemFS.copyToLocalFile(skPath, localJobTokenFile);      
-      // set it into jobConf to transfer the name to TaskRunner
-      jobConf.set(TokenCache.JOB_TOKEN_FILENAME, localJobTokenFile.toString());
+      return localJobTokenFileStr;
     }
 
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Mar  4 03:45:31 2011
@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapreduce.security;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Collection;
 
@@ -36,27 +35,20 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 
 /**
- * this class keeps static references to TokenStorage object
- * also it provides auxiliary methods for setting and getting secret keys  
+ * This class provides user facing APIs for transferring secrets from
+ * the job client to the tasks.
+ * The secrets can be stored just before submission of jobs and read during
+ * the task execution.   
  */
 //@InterfaceStability.Evolving
 public class TokenCache {
   
   private static final Log LOG = LogFactory.getLog(TokenCache.class);
-  /**
-   * file name used on HDFS for generated job token
-   */
-  public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
-
-  /**
-   * conf setting for job tokens cache file name
-   */
-
-  public static final String JOB_TOKEN_FILENAME = "mapreduce.job.jobTokenFile";
 
   private static TokenStorage tokenStorage;
   
@@ -76,7 +68,7 @@ public class TokenCache {
    * @param alias
    * @param key
    */
-  public static void setSecretKey(Text alias, byte[] key) {
+  public static void addSecretKey(Text alias, byte[] key) {
     getTokenStorage().addSecretKey(alias, key);
   }
   
@@ -85,17 +77,7 @@ public class TokenCache {
    */
   public static void addDelegationToken(
       String namenode, Token<? extends TokenIdentifier> t) {
-    getTokenStorage().setToken(new Text(namenode), t);
-  }
-  
-  /**
-   * 
-   * @param namenode
-   * @return delegation token
-   */
-  @SuppressWarnings("unchecked")
-  public static Token<DelegationTokenIdentifier> getDelegationToken(String namenode) {
-    return (Token<DelegationTokenIdentifier>)getTokenStorage().getToken(new Text(namenode));
+    getTokenStorage().addToken(new Text(namenode), t);
   }
 
   /**
@@ -107,16 +89,81 @@ public class TokenCache {
   }
   
   /**
+   * Convenience method to obtain delegation tokens from namenodes 
+   * corresponding to the paths passed.
+   * @param ps array of paths
+   * @param conf configuration
+   * @throws IOException
+   */
+  public static void obtainTokensForNamenodes(Path [] ps, Configuration conf) 
+  throws IOException {
+    // get jobtracker principal id (for the renewer)
+    Text jtCreds = new Text(conf.get(JobContext.JOB_JOBTRACKER_ID, ""));
+
+    for(Path p: ps) {
+      FileSystem fs = FileSystem.get(p.toUri(), conf);
+      if(fs instanceof DistributedFileSystem) {
+        DistributedFileSystem dfs = (DistributedFileSystem)fs;
+        URI uri = fs.getUri();
+        String fs_addr = buildDTServiceName(uri);
+
+        // see if we already have the token
+        Token<DelegationTokenIdentifier> token = 
+          TokenCache.getDelegationToken(fs_addr); 
+        if(token != null) {
+          LOG.debug("DT for " + token.getService()  + " is already present");
+          continue;
+        }
+        // get the token
+        token = dfs.getDelegationToken(jtCreds);
+        if(token==null) 
+          throw new IOException("Token from " + fs_addr + " is null");
+
+        token.setService(new Text(fs_addr));
+        TokenCache.addDelegationToken(fs_addr, token);
+        LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr + 
+            ";t.service="+token.getService());
+      }
+    }
+  }
+
+  /**
+   * file name used on HDFS for generated job token
+   */
+  //@InterfaceAudience.Private
+  public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
+
+  /**
+   * conf setting for job tokens cache file name
+   */
+  //@InterfaceAudience.Private
+  public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
+  private static final Text JOB_TOKEN = new Text("ShuffleAndJobToken");
+
+  /**
+   * 
+   * @param namenode
+   * @return delegation token
+   */
+  @SuppressWarnings("unchecked")
+  //@InterfaceAudience.Private
+  public static Token<DelegationTokenIdentifier> 
+  getDelegationToken(String namenode) {
+    return (Token<DelegationTokenIdentifier>)getTokenStorage().
+    getToken(new Text(namenode));
+  }
+
+  /**
    * @return TokenStore object
    */
   //@InterfaceAudience.Private
   public static TokenStorage getTokenStorage() {
     if(tokenStorage==null)
       tokenStorage = new TokenStorage();
-    
+
     return tokenStorage;
   }
-  
+
   /**
    * sets TokenStorage
    * @param ts
@@ -169,6 +216,29 @@ public class TokenCache {
     return ts;
   }
 
+  /**
+   * store job token
+   * @param t
+   */
+  //@InterfaceAudience.Private
+  public static void setJobToken(Token<? extends TokenIdentifier> t, 
+      TokenStorage ts) {
+    ts.addToken(JOB_TOKEN, t);
+  }
+  /**
+   * 
+   * @return job token
+   */
+  //@InterfaceAudience.Private
+  public static Token<? extends TokenIdentifier> getJobToken(TokenStorage ts) {
+    return ts.getToken(JOB_TOKEN);
+  }
+
+  /**
+   * create service name for Delegation token ip:port
+   * @param uri
+   * @return "ip:port"
+   */
   static String buildDTServiceName(URI uri) {
     int port = uri.getPort();
     if(port == -1) 
@@ -181,42 +251,4 @@ public class TokenCache {
     sb.append(NetUtils.normalizeHostName(uri.getHost())).append(":").append(port);
     return sb.toString();
   }
-    
-  /**
-   * get Delegation for each distinct dfs for given paths.
-   * @param ps
-   * @param conf
-   * @throws IOException
-   */
-  public static void obtainTokensForNamenodes(Path [] ps, Configuration conf) 
-  throws IOException {
-    // get jobtracker principal id (for the renewer)
-    Text jtCreds = new Text(conf.get(JobContext.JOB_JOBTRACKER_ID, ""));
-
-    for(Path p: ps) {
-      FileSystem fs = FileSystem.get(p.toUri(), conf);
-      if(fs instanceof DistributedFileSystem) {
-        DistributedFileSystem dfs = (DistributedFileSystem)fs;
-        URI uri = fs.getUri();
-        String fs_addr = buildDTServiceName(uri);
-        
-        // see if we already have the token
-        Token<DelegationTokenIdentifier> token =
-          TokenCache.getDelegationToken(fs_addr);
-        if(token != null) {
-          LOG.debug("DT for " + token.getService()  + " is already present");
-          continue;
-        }
-        // get the token
-        token = dfs.getDelegationToken(jtCreds);
-        if(token==null)
-          throw new IOException("Token from " + fs_addr + " is null");
-
-        token.setService(new Text(fs_addr));
-        TokenCache.addDelegationToken(fs_addr, token);
-        LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr +
-            ";t.service="+token.getService());
-      }
-    }
-  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar  4 03:45:31 2011
@@ -136,6 +136,11 @@ public class TestTaskTrackerLocalization
     // Set up the task to be localized
     String jtIdentifier = "200907202331";
     jobId = new JobID(jtIdentifier, 1);
+    
+    TaskTracker.RunningJob rjob = new TaskTracker.RunningJob(jobId);
+    rjob.ugi = UserGroupInformation.getCurrentUser();
+    tracker.runningJobs.put(jobId, rjob);
+    
     taskId =
         new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0);
     task =
@@ -354,7 +359,8 @@ public class TestTaskTrackerLocalization
     tracker.getLocalizer().initializeUserDirs(task.getUser());
 
     // /////////// The main method being tested
-    localizedJobConf = tracker.localizeJobFiles(task);
+    localizedJobConf = tracker.localizeJobFiles(task, 
+        new TaskTracker.RunningJob(task.getJobID()));
     // ///////////
 
     // Now initialize the job via task-controller so as to set
@@ -448,7 +454,8 @@ public class TestTaskTrackerLocalization
       return;
     }
     tracker.getLocalizer().initializeUserDirs(task.getUser());
-    localizedJobConf = tracker.localizeJobFiles(task);
+    localizedJobConf = tracker.localizeJobFiles(task, 
+        new TaskTracker.RunningJob(task.getJobID()));
 
     // Now initialize the job via task-controller so as to set
     // ownership/permissions of jars, job-work-dir
@@ -654,7 +661,8 @@ public class TestTaskTrackerLocalization
       throws Exception {
     // Localize job and localize task.
     tracker.getLocalizer().initializeUserDirs(task.getUser());
-    localizedJobConf = tracker.localizeJobFiles(task);
+    localizedJobConf = tracker.localizeJobFiles(task, 
+        new TaskTracker.RunningJob(task.getJobID()));
     if (jvmReuse) {
       localizedJobConf.setNumTasksToExecutePerJvm(2);
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1077143&r1=1077142&r2=1077143&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Mar  4 03:45:31 2011
@@ -49,7 +49,7 @@ import org.apache.hadoop.mapred.OutputCo
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -84,7 +84,7 @@ public class TestTokenCache {
 
       System.out.println("inside MAP: ts==NULL?=" + (ts==null) + 
           "; #keys = " + (ts==null? 0:ts.numberOfSecretKeys()) + 
-          ";jobToken = " +  (ts==null? "n/a":ts.getJobToken()) +
+          ";jobToken = " +  (ts==null? "n/a":TokenCache.getJobToken(ts)) +
           "; alias1 key=" + new String(key1) + 
           "; dts size= " + dts_size);
     
@@ -257,6 +257,7 @@ public class TestTokenCache {
     p1 = fs.makeQualified(p1);
     // do not qualify p2
 
+    TokenCache.setTokenStorage(new TokenStorage());
     TokenCache.obtainTokensForNamenodes(new Path [] {p1, p2}, jConf);
 
     // this token is keyed by hostname:port key.