You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:45:20 UTC

svn commit: r1077141 - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/mapred/org/apache/hadoop/filecache/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/ src/mapred/org/apache/hadoop/mapreduce/lib/input/ sr...

Author: omalley
Date: Fri Mar  4 03:45:19 2011
New Revision: 1077141

URL: http://svn.apache.org/viewvc?rev=1077141&view=rev
Log:
commit 5dd9e6a03d8f026c4ed4af85013bff0ca2cc34a5
Author: Boris Shkolnik <bo...@yahoo-inc.com>
Date:   Mon Feb 1 22:52:13 2010 -0800

    MAPREDUCE:1383 from https://issues.apache.org/jira/secure/attachment/12434455/MAPREDUCE-1383-BP20-7.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1383. Automates fetching of delegation tokens in File*Formats
    +    Distributed Cache and Distcp. Also, provides a config mapreduce.job.hdfs-servers
    +    that the jobs can populate with a comma separated list of namenodes.
    +    The job client automatically fetches delegation tokens from those namenodes.
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
Removed:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestTokenCache.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/build.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenStorage.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java

Modified: hadoop/common/branches/branch-0.20-security-patches/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/build.xml?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/build.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/build.xml Fri Mar  4 03:45:19 2011
@@ -367,7 +367,7 @@
      
   </target>
 
-  <target name="compile-mapred-classes" depends="compile-core-classes">
+  <target name="compile-mapred-classes" depends="compile-core-classes,compile-hdfs-classes">
     <jsp-compile
      uriroot="${src.webapps}/task"
      outputdir="${build.src}"

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Fri Mar  4 03:45:19 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.util.RunJar;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 
 /**
  * Manages a single machine's instance of a cross-job
@@ -725,4 +726,32 @@ public class TrackerDistributedCacheMana
   static void setFileVisibilities(Configuration conf, String booleans) {
     conf.set(JobContext.CACHE_FILE_VISIBILITIES, booleans);
   }
+  
+  /**
+   * For each archive or cache file - get the corresponding delegation token
+   * @param job
+   * @throws IOException
+   */
+  public static void getDelegationTokens(Configuration job) throws IOException {
+    URI[] tarchives = DistributedCache.getCacheArchives(job);
+    URI[] tfiles = DistributedCache.getCacheFiles(job);
+
+    int size = (tarchives!=null? tarchives.length : 0) + (tfiles!=null ? tfiles.length :0);
+    Path[] ps = new Path[size];
+
+    int i = 0;
+    if (tarchives != null) {
+      for (i=0; i < tarchives.length; i++) {
+        ps[i] = new Path(tarchives[i].toString());
+      }
+    }
+
+    if (tfiles != null) {
+      for(int j=0; j< tfiles.length; j++) {
+        ps[i+j] = new Path(tfiles[j].toString());
+      }
+    }
+
+    TokenCache.obtainTokensForNamenodes(ps, job);
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar  4 03:45:19 2011
@@ -26,25 +26,20 @@ import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.JvmTask;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
-import org.apache.log4j.LogManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
 
 /** 
  * The main() for child processes. 
@@ -72,8 +67,7 @@ class Child {
 
     // file name is passed thru env
     String jobTokenFile = System.getenv().get("JOB_TOKEN_FILE");
-    defaultConf.set(JobContext.JOB_TOKEN_FILE, jobTokenFile);
-    TokenStorage ts = TokenCache.loadTaskTokenStorage(defaultConf);
+    TokenStorage ts = TokenCache.loadTaskTokenStorage(jobTokenFile, defaultConf);
     LOG.debug("loading token. # keys =" +ts.numberOfSecretKeys() + 
         "; from file=" + jobTokenFile);
 
@@ -155,7 +149,7 @@ class Child {
         JobConf job = new JobConf(task.getJobFile());
 
         // set job shuffle token
-         Token<JobTokenIdentifier> jt = (Token<JobTokenIdentifier>)ts.getJobToken();
+        Token<? extends TokenIdentifier> jt = ts.getJobToken();
         // set the jobTokenFile into task
         task.setJobTokenSecret(JobTokenSecretManager.createSecretKey(jt.getPassword()));
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java Fri Mar  4 03:45:19 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
@@ -152,6 +153,9 @@ public abstract class FileInputFormat<K,
       throw new IOException("No input paths specified in job");
     }
 
+    // get tokens for all the required FileSystems..
+    TokenCache.obtainTokensForNamenodes(dirs, job);
+    
     List<FileStatus> result = new ArrayList<FileStatus>();
     List<IOException> errors = new ArrayList<IOException>();
     

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java Fri Mar  4 03:45:19 2011
@@ -24,6 +24,7 @@ import java.text.NumberFormat;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.util.Progressable;
 
 /** A base class for {@link OutputFormat}. */
@@ -106,6 +107,10 @@ public abstract class FileOutputFormat<K
       // normalize the output directory
       outDir = fs.makeQualified(outDir);
       setOutputPath(job, outDir);
+      
+      // get delegation token for the outDir's file system
+      TokenCache.obtainTokensForNamenodes(new Path[] {outDir}, job);
+      
       // check its existence
       if (fs.exists(outDir)) {
         throw new FileAlreadyExistsException("Output directory " + outDir + 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Mar  4 03:45:19 2011
@@ -64,7 +64,7 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -72,6 +72,8 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 
 /**
@@ -639,6 +641,8 @@ public class JobClient extends Configure
     TrackerDistributedCacheManager.determineTimestamps(job);
     //  set the public/private visibility of the archives and files
     TrackerDistributedCacheManager.determineCacheVisibilities(job);
+    // get DelegationTokens for cache files
+    TrackerDistributedCacheManager.getDelegationTokens(job);
 
     String originalJarPath = job.getJar();
 
@@ -726,7 +730,12 @@ public class JobClient extends Configure
     job.set("mapreduce.job.dir", submitJobDir.toString());
     JobStatus status = null;
     try {
+      
       copyAndConfigureFiles(job, submitJobDir);
+      
+      // get delegation token for the dir
+      TokenCache.obtainTokensForNamenodes(new Path [] {submitJobDir}, job);
+      
       Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
       int reduces = job.getNumReduceTasks();
       JobContext context = new JobContext(job, jobId);
@@ -756,29 +765,13 @@ public class JobClient extends Configure
         out.close();
       }
       
-      // create TokenStorage object with user secretKeys
-      String tokensFileName = job.get("tokenCacheFile");
-      TokenStorage tokenStorage = null;
-      if(tokensFileName != null) {
-        LOG.info("loading secret keys from " + tokensFileName);
-        String localFileName = new Path(tokensFileName).toUri().getPath();
-        tokenStorage = new TokenStorage();
-        // read JSON
-        ObjectMapper mapper = new ObjectMapper();
-        Map<String, String> nm = 
-          mapper.readValue(new File(localFileName), Map.class);
-        
-        for(Map.Entry<String, String> ent: nm.entrySet()) {
-          LOG.debug("adding secret key alias="+ent.getKey());
-          tokenStorage.addSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
-        }
-      }
 
       //
       // Now, actually submit the job (using the submit name)
       //
+      populateTokenCache(job);
       status = jobSubmitClient.submitJob(
-          jobId, submitJobDir.toString(), tokenStorage);
+         jobId, submitJobDir.toString(), TokenCache.getTokenStorage());
       if (status != null) {
         return new NetworkedJob(status);
       } else {
@@ -1789,5 +1782,45 @@ public class JobClient extends Configure
     int res = ToolRunner.run(new JobClient(), argv);
     System.exit(res);
   }
+  
+  //get secret keys and tokens and store them into TokenCache
+  @SuppressWarnings("unchecked")
+  private void populateTokenCache(Configuration conf) throws IOException{
+    // create TokenStorage object with user secretKeys
+    String tokensFileName = conf.get("tokenCacheFile");
+    if(tokensFileName != null) {
+      LOG.info("loading user's secret keys from " + tokensFileName);
+      String localFileName = new Path(tokensFileName).toUri().getPath();
+
+      boolean json_error = false;
+      try {
+        // read JSON
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> nm = 
+          mapper.readValue(new File(localFileName), Map.class);
+
+        for(Map.Entry<String, String> ent: nm.entrySet()) {
+          TokenCache.setSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
+        }
+      } catch (JsonMappingException e) {
+        json_error = true;
+      } catch (JsonParseException e) {
+        json_error = true;
+      }
+      if(json_error)
+        LOG.warn("couldn't parse Token Cache JSON file with user secret keys");
+    }
+
+    // add the delegation tokens from configuration
+    String [] nameNodes = conf.getStrings(JobContext.JOB_NAMENODES);
+    LOG.info("adding the following namenodes' delegation tokens:" + Arrays.toString(nameNodes));
+    if(nameNodes != null) {
+      Path [] ps = new Path[nameNodes.length];
+      for(int i=0; i< nameNodes.length; i++) {
+        ps[i] = new Path(nameNodes[i]);
+      }
+      TokenCache.obtainTokensForNamenodes(ps, conf);
+    }
+  }
 }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 03:45:19 2011
@@ -43,7 +43,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.TokenStorage;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.metrics.MetricsContext;
@@ -3090,7 +3090,7 @@ class JobInProgress {
    */
   private void generateAndStoreTokens() throws IOException {
     Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
-    Path keysFile = new Path(jobDir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
+    Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE);
     // we need to create this file using the jobtracker's filesystem
     FSDataOutputStream os = jobtracker.getFileSystem().create(keysFile);
     //create JobToken file and write token to it

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar  4 03:45:19 2011
@@ -36,6 +36,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
@@ -497,7 +498,7 @@ abstract class TaskRunner extends Thread
     }
     env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
 
-    String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
+    String jobTokenFile = conf.get(TokenCache.JOB_TOKEN_FILENAME);
     LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
     env.put("JOB_TOKEN_FILE", jobTokenFile);
     

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 03:45:19 2011
@@ -927,6 +927,7 @@ public class TaskTracker 
                               new LocalDirAllocator("mapred.local.dir");
 
   // intialize the job directory
+  @SuppressWarnings("unchecked")
   private void localizeJob(TaskInProgress tip) 
   throws IOException, InterruptedException {
     Path localJarFile = null;
@@ -955,7 +956,8 @@ public class TaskTracker 
         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                              localJobConf.getKeepFailedTaskFiles());
 
-        TokenStorage ts = TokenCache.loadTokens(rjob.jobConf);
+        TokenStorage ts = TokenCache.loadTokens(rjob.jobConf.get(TokenCache.JOB_TOKEN_FILENAME), rjob.jobConf);
+
         Token<JobTokenIdentifier> jt = (Token<JobTokenIdentifier>)ts.getJobToken(); 
         getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
  
@@ -3707,7 +3709,7 @@ public class TaskTracker 
         throws IOException {
       // check if the tokenJob file is there..
       Path skPath = new Path(systemDirectory, 
-          jobId.toString()+"/"+SecureShuffleUtils.JOB_TOKEN_FILENAME);
+          jobId.toString()+"/"+TokenCache.JOB_TOKEN_HDFS_FILE);
       
       FileStatus status = null;
       long jobTokenSize = -1;
@@ -3724,7 +3726,7 @@ public class TaskTracker 
       // Download job_token
       systemFS.copyToLocalFile(skPath, localJobTokenFile);      
       // set it into jobConf to transfer the name to TaskRunner
-      jobConf.set(JobContext.JOB_TOKEN_FILE,localJobTokenFile.toString());
+      jobConf.set(TokenCache.JOB_TOKEN_FILENAME, localJobTokenFile.toString());
     }
 
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Fri Mar  4 03:45:19 2011
@@ -48,7 +48,8 @@ public class JobContext {
   protected final org.apache.hadoop.mapred.JobConf conf;
   private final JobID jobId;
 
-  public static final String JOB_TOKEN_FILE = "mapreduce.job.jobTokenFile";
+  public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
+  public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal";
 
   public static final String CACHE_FILE_VISIBILITIES = 
     "mapreduce.job.cache.files.visibilities";

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Fri Mar  4 03:45:19 2011
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -185,6 +186,9 @@ public abstract class FileInputFormat<K,
     if (dirs.length == 0) {
       throw new IOException("No input paths specified in job");
     }
+    
+    // get tokens for all the required FileSystems..
+    TokenCache.obtainTokensForNamenodes(dirs, job.getConfiguration());
 
     List<IOException> errors = new ArrayList<IOException>();
     

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java Fri Mar  4 03:45:19 2011
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 
 /** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/
 public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
@@ -119,6 +120,10 @@ public abstract class FileOutputFormat<K
     if (outDir == null) {
       throw new InvalidJobConfException("Output directory not set.");
     }
+    
+    // get delegation token for outDir's file system
+    TokenCache.obtainTokensForNamenodes(new Path[] {outDir}, job.getConfiguration());
+
     if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
       throw new FileAlreadyExistsException("Output directory " + outDir + 
                                            " already exists");

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java Fri Mar  4 03:45:19 2011
@@ -38,10 +38,6 @@ import org.apache.hadoop.record.Utils;
 public class SecureShuffleUtils {
   public static final String HTTP_HEADER_URL_HASH = "UrlHash";
   public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
-  /**
-   * file name used on HDFS for generated job token
-   */
-  public static final String JOB_TOKEN_FILENAME = "jobToken";
   
   /**
    * Base64 encoded hash of msg

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Mar  4 03:45:19 2011
@@ -19,16 +19,23 @@
 package org.apache.hadoop.mapreduce.security;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
 import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.security.token.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 
@@ -40,6 +47,16 @@ import org.apache.hadoop.security.token.
 public class TokenCache {
   
   private static final Log LOG = LogFactory.getLog(TokenCache.class);
+  /**
+   * file name used on HDFS for generated job token
+   */
+  public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
+
+  /**
+   * conf setting for job tokens cache file name
+   */
+
+  public static final String JOB_TOKEN_FILENAME = "mapreduce.job.jobTokenFile";
 
   private static TokenStorage tokenStorage;
   
@@ -72,6 +89,16 @@ public class TokenCache {
   }
   
   /**
+   * 
+   * @param namenode
+   * @return delegation token
+   */
+  @SuppressWarnings("unchecked")
+  public static Token<DelegationTokenIdentifier> getDelegationToken(String namenode) {
+    return (Token<DelegationTokenIdentifier>)getTokenStorage().getToken(new Text(namenode));
+  }
+
+  /**
    * auxiliary method 
    * @return all the available tokens
    */
@@ -109,12 +136,12 @@ public class TokenCache {
    * @throws IOException
    */
   //@InterfaceAudience.Private
-  public static TokenStorage loadTaskTokenStorage(JobConf conf)
+  public static TokenStorage loadTaskTokenStorage(String fileName, JobConf conf)
   throws IOException {
     if(tokenStorage != null)
       return tokenStorage;
     
-    tokenStorage = loadTokens(conf);
+    tokenStorage = loadTokens(fileName, conf);
     
     return tokenStorage;
   }
@@ -125,9 +152,8 @@ public class TokenCache {
    * @throws IOException
    */
   //@InterfaceAudience.Private
-  public static TokenStorage loadTokens(JobConf conf) 
+  public static TokenStorage loadTokens(String jobTokenFile, JobConf conf) 
   throws IOException {
-    String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
     Path localJobTokenFile = new Path (jobTokenFile);
     FileSystem localFS = FileSystem.getLocal(conf);
     FSDataInputStream in = localFS.open(localJobTokenFile);
@@ -135,9 +161,62 @@ public class TokenCache {
     TokenStorage ts = new TokenStorage();
     ts.readFields(in);
 
-    LOG.info("Task: Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath() 
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Task: Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath() 
         +"; num of sec keys  = " + ts.numberOfSecretKeys());
+    }
     in.close();
     return ts;
   }
+
+  static String buildDTServiceName(URI uri) {
+    int port = uri.getPort();
+    if(port == -1) 
+      port = NameNode.DEFAULT_PORT;
+    
+    // build the service name string "/ip:port"
+    // for whatever reason using NetUtils.createSocketAddr(target).toString()
+    // returns "localhost/ip:port"
+    StringBuffer sb = new StringBuffer();
+    sb.append(NetUtils.normalizeHostName(uri.getHost())).append(":").append(port);
+    return sb.toString();
+  }
+    
+  /**
+   * get Delegation for each distinct dfs for given paths.
+   * @param ps
+   * @param conf
+   * @throws IOException
+   */
+  public static void obtainTokensForNamenodes(Path [] ps, Configuration conf) 
+  throws IOException {
+    // get jobtracker principal id (for the renewer)
+    Text jtCreds = new Text(conf.get(JobContext.JOB_JOBTRACKER_ID, ""));
+
+    for(Path p: ps) {
+      FileSystem fs = FileSystem.get(p.toUri(), conf);
+      if(fs instanceof DistributedFileSystem) {
+        DistributedFileSystem dfs = (DistributedFileSystem)fs;
+        URI uri = fs.getUri();
+        String fs_addr = buildDTServiceName(uri);
+        
+        // see if we already have the token
+        Token<DelegationTokenIdentifier> token =
+          TokenCache.getDelegationToken(fs_addr);
+        if(token != null) {
+          LOG.debug("DT for " + token.getService()  + " is already present");
+          continue;
+        }
+        // get the token
+        token = dfs.getDelegationToken(jtCreds);
+        if(token==null)
+          throw new IOException("Token from " + fs_addr + " is null");
+
+        token.setService(new Text(fs_addr));
+        TokenCache.addDelegationToken(fs_addr, token);
+        LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr +
+            ";t.service="+token.getService());
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenStorage.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenStorage.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenStorage.java Fri Mar  4 03:45:19 2011
@@ -40,7 +40,7 @@ import org.apache.hadoop.security.token.
 //@InterfaceAudience.Private
 public class TokenStorage implements Writable {
 
-  private static final Text SHUFFLE_JOB_TOKEN = new Text("ShuffleJobToken");
+  private static final Text JOB_TOKEN = new Text("ShuffleJobToken");
 
   private  Map<Text, byte[]> secretKeysMap = new HashMap<Text, byte[]>();
   private  Map<Text, Token<? extends TokenIdentifier>> tokenMap = 
@@ -74,7 +74,7 @@ public class TokenStorage implements Wri
    */
   //@InterfaceAudience.Private
   public void setJobToken(Token<? extends TokenIdentifier> t) {
-    setToken(SHUFFLE_JOB_TOKEN, t);
+    setToken(JOB_TOKEN, t);
   }
   
   /**
@@ -83,7 +83,7 @@ public class TokenStorage implements Wri
    */
   //@InterfaceAudience.Private
   public Token<? extends TokenIdentifier> getJobToken() {
-    return getToken(SHUFFLE_JOB_TOKEN);
+    return getToken(JOB_TOKEN);
   }
   
   /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar  4 03:45:19 2011
@@ -21,31 +21,28 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
+import junit.framework.TestCase;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
-import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
-
-import junit.framework.TestCase;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Shell;
 
 /**
  * Test to verify localization of a job and localization of a task on a
@@ -207,7 +204,7 @@ public class TestTaskTrackerLocalization
     if(!dir.exists())
       assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs());
 
-    File jobTokenFile = new File(dir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
+    File jobTokenFile = new File(dir, TokenCache.JOB_TOKEN_HDFS_FILE);
     FileOutputStream fos = new FileOutputStream(jobTokenFile);
     java.io.DataOutputStream out = new java.io.DataOutputStream(fos);
     Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1077141&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Mar  4 03:45:19 2011
@@ -0,0 +1,284 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.security.token.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+
+public class TestTokenCache {
+  private static final int NUM_OF_KEYS = 10;
+
+  // my sleep class - adds check for tokenCache
+  static class MySleepJob extends SleepJob {
+    /**
+     * attempts to access tokenCache as from client
+     */
+    @Override
+    public void map(IntWritable key, IntWritable value,
+        OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
+        throws IOException {
+      // get token storage and a key
+      TokenStorage ts = TokenCache.getTokenStorage();
+      byte[] key1 = TokenCache.getSecretKey(new Text("alias1"));
+      Collection<Token<? extends TokenIdentifier>> dts = TokenCache.getAllTokens();
+      int dts_size = 0;
+      if(dts != null)
+        dts_size = dts.size();
+
+      System.out.println("inside MAP: ts==NULL?=" + (ts==null) + 
+          "; #keys = " + (ts==null? 0:ts.numberOfSecretKeys()) + 
+          ";jobToken = " +  (ts==null? "n/a":ts.getJobToken()) +
+          "; alias1 key=" + new String(key1) + 
+          "; dts size= " + dts_size);
+    
+      for(Token<? extends TokenIdentifier> t : dts) {
+        System.out.println(t.getKind() + "=" + StringUtils.byteToHexString(t.getPassword()));
+      }
+
+      if(dts.size() != 2) { // one job token and one delegation token
+        throw new RuntimeException("tokens are not available"); // fail the test
+      }
+
+      if(key1 == null || ts == null || ts.numberOfSecretKeys() != NUM_OF_KEYS) {
+        throw new RuntimeException("secret keys are not available"); // fail the test
+      }
+      
+      super.map(key, value, output, reporter);
+    }
+    
+    public JobConf setupJobConf(int numMapper, int numReducer, 
+        long mapSleepTime, int mapSleepCount, 
+        long reduceSleepTime, int reduceSleepCount) {
+      
+      JobConf job = super.setupJobConf(numMapper,numReducer, 
+          mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount);
+      
+      job.setMapperClass(MySleepJob.class);
+      
+      return job;
+    }
+  }
+  
+  private static MiniMRCluster mrCluster;
+  private static MiniDFSCluster dfsCluster;
+  private static final Path TEST_DIR = 
+    new Path(System.getProperty("test.build.data","/tmp"), "sleepTest");
+  private static final Path tokenFileName = new Path(TEST_DIR, "tokenFile.json");
+  private static int numSlaves = 1;
+  private static JobConf jConf;
+  private static ObjectMapper mapper = new ObjectMapper();
+  
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
+    jConf = new JobConf(conf);
+    mrCluster = new MiniMRCluster(0, 0, numSlaves, 
+        dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null, 
+        jConf);
+    
+    createTokenFileJson();
+    verifySecretKeysInJSONFile();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if(mrCluster != null)
+      mrCluster.shutdown();
+    mrCluster = null;
+    if(dfsCluster != null)
+      dfsCluster.shutdown();
+    dfsCluster = null;
+  }
+
+  // create jason file and put some keys into it..
+  private static void createTokenFileJson() throws IOException {
+    Map<String, String> map = new HashMap<String, String>();
+    
+    try {
+      KeyGenerator kg = KeyGenerator.getInstance("HmacSHA1");
+      for(int i=0; i<NUM_OF_KEYS; i++) {
+        SecretKeySpec key = (SecretKeySpec) kg.generateKey();
+        byte [] enc_key = key.getEncoded();
+        map.put("alias"+i, new String(Base64.encodeBase64(enc_key)));
+
+      }
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException(e);
+    }
+    
+    System.out.println("writing secret keys into " + tokenFileName);
+    try {
+      File p  = new File(tokenFileName.getParent().toString());
+      p.mkdirs();
+      // convert to JSON and save to the file
+      mapper.writeValue(new File(tokenFileName.toString()), map);
+
+    } catch (Exception e) {
+      System.out.println("failed with :" + e.getLocalizedMessage());
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private static void verifySecretKeysInJSONFile() throws IOException {
+    Map<String, String> map;
+    map = mapper.readValue(new File(tokenFileName.toString()), Map.class);
+    assertEquals("didn't read JSON correctly", map.size(), NUM_OF_KEYS);
+    
+    System.out.println("file " + tokenFileName + " verified; size="+ map.size());
+  }
+  
+  /**
+   * run a distributed job and verify that TokenCache is available
+   * @throws IOException
+   */
+  @Test
+  public void testTokenCache() throws IOException {
+    
+    System.out.println("running dist job");
+    
+    // make sure JT starts
+    jConf = mrCluster.createJobConf();
+    
+    // provide namenodes names for the job to get the delegation tokens for
+    //String nnUri = dfsCluster.getNameNode().getUri(namenode).toString();
+    NameNode nn = dfsCluster.getNameNode();
+    URI nnUri = NameNode.getUri(nn.getNameNodeAddress());
+    jConf.set(JobContext.JOB_NAMENODES, nnUri + "," + nnUri.toString());
+    // job tracker principle id..
+    jConf.set(JobContext.JOB_JOBTRACKER_ID, "jt_id");
+
+    // using argument to pass the file name
+    String[] args = {
+       "-tokenCacheFile", tokenFileName.toString(), 
+        "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
+        };
+     
+    int res = -1;
+    try {
+      res = ToolRunner.run(jConf, new MySleepJob(), args);
+    } catch (Exception e) {
+      System.out.println("Job failed with" + e.getLocalizedMessage());
+      e.printStackTrace(System.out);
+      fail("Job failed");
+    }
+    assertEquals("dist job res is not 0", res, 0);
+  }
+  
+  /**
+   * run a local job and verify that TokenCache is available
+   * @throws NoSuchAlgorithmException
+   * @throws IOException
+   */
+  @Test
+  public void testLocalJobTokenCache() throws NoSuchAlgorithmException, IOException {
+    
+    System.out.println("running local job");
+    // this is local job
+    String[] args = {"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"}; 
+    jConf.set("tokenCacheFile", tokenFileName.toString());
+    
+    int res = -1;
+    try {
+      res = ToolRunner.run(jConf, new MySleepJob(), args);
+    } catch (Exception e) {
+      System.out.println("Job failed with" + e.getLocalizedMessage());
+      e.printStackTrace(System.out);
+      fail("local Job failed");
+    }
+    assertEquals("local job res is not 0", res, 0);
+  }
+  
+  @Test
+  public void testGetTokensForNamenodes() throws IOException {
+    FileSystem fs = dfsCluster.getFileSystem();
+
+    Path p1 = new Path("file1");
+    Path p2 = new Path("file2");
+
+    p1 = fs.makeQualified(p1);
+    // do not qualify p2
+
+    TokenCache.obtainTokensForNamenodes(new Path [] {p1, p2}, jConf);
+
+    // this token is keyed by hostname:port key.
+    String fs_addr = TokenCache.buildDTServiceName(p1.toUri()); 
+    Token<DelegationTokenIdentifier> nnt = TokenCache.getDelegationToken(fs_addr);
+    System.out.println("dt for " + p1 + "(" + fs_addr + ")" + " = " +  nnt);
+
+    assertNotNull("Token for nn is null", nnt);
+
+    // verify the size
+    Collection<Token<? extends TokenIdentifier>> tns = TokenCache.getAllTokens();
+    assertEquals("number of tokens is not 1", 1, tns.size());
+
+    boolean found = false;
+    for(Token<? extends TokenIdentifier> t: tns) {
+      System.out.println("kind="+t.getKind() + ";servic=" + t.getService() + ";str=" + t.toString());
+
+      if(t.getKind().equals(new Text("HDFS_DELEGATION_TOKEN")) &&
+          t.getService().equals(new Text(fs_addr))) {
+        found = true;
+      }
+      assertTrue("didn't find token for " + p1 ,found);
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java?rev=1077141&r1=1077140&r2=1077141&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java Fri Mar  4 03:45:19 2011
@@ -63,6 +63,7 @@ import org.apache.hadoop.mapred.RecordRe
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
@@ -623,6 +624,12 @@ public class DistCp implements Tool {
   private static void checkSrcPath(Configuration conf, List<Path> srcPaths
       ) throws IOException {
     List<IOException> rslt = new ArrayList<IOException>();
+    
+    // get tokens for all the required FileSystems..
+    Path[] ps = new Path[srcPaths.size()];
+    ps = srcPaths.toArray(ps);
+    TokenCache.obtainTokensForNamenodes(ps, conf);
+
     for (Path p : srcPaths) {
       FileSystem fs = p.getFileSystem(conf);
       if (!fs.exists(p)) {
@@ -1018,6 +1025,10 @@ public class DistCp implements Tool {
     jobConf.set(JOB_DIR_LABEL, jobDirectory.toString());
 
     FileSystem dstfs = args.dst.getFileSystem(conf);
+    
+    // get tokens for all the required FileSystems..
+    TokenCache.obtainTokensForNamenodes(new Path[] {args.dst}, conf);
+    
     boolean dstExists = dstfs.exists(args.dst);
     boolean dstIsDir = false;
     if (dstExists) {