You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/07/09 22:27:13 UTC

svn commit: r962682 - in /hadoop/mapreduce/trunk: ./ src/contrib/mumak/src/java/org/apache/hadoop/mapred/ src/contrib/mumak/src/test/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/...

Author: ddas
Date: Fri Jul  9 20:27:12 2010
New Revision: 962682

URL: http://svn.apache.org/viewvc?rev=962682&view=rev
Log:
MAPREDUCE-1528. Incorporates the changes to the credentials API done in HADOOP-6845. Also, introduces Credentials in JobConf, and in JobContext. Contributed by Jitendra Pandey and Arun Murthy.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCacheOldApi.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul  9 20:27:12 2010
@@ -155,6 +155,10 @@ Trunk (unreleased changes)
     MAPREDUCE-1820. Fix InputSampler to clone sampled keys. (Alex Kozlov via
     cdouglas)
 
+    MAPREDUCE-1528. Incorporates the changes to the credentials API done in
+    HADOOP-6845. Also, introduces Credentials in JobConf, and in JobContext.
+    (Jitendra Pandey and Arun Murthy via ddas)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java Fri Jul  9 20:27:12 2010
@@ -35,7 +35,7 @@ import org.apache.hadoop.tools.rumen.Job
 import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
 import org.apache.hadoop.mapred.SimulatorJobInProgress;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 
 /**
  * {@link SimulatorJobTracker} extends {@link JobTracker}. It implements the
@@ -175,7 +175,7 @@ public class SimulatorJobTracker extends
 
   @Override
   public synchronized JobStatus submitJob(
-      JobID jobId, String jobSubmitDir, TokenStorage ts) 
+      JobID jobId, String jobSubmitDir, Credentials ts) 
   throws IOException {
     boolean loggingEnabled = LOG.isDebugEnabled();
     if (loggingEnabled) {

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java Fri Jul  9 20:27:12 2010
@@ -36,7 +36,7 @@ import org.apache.hadoop.mapreduce.JobPr
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.QueueAclsInfo;
 import org.apache.hadoop.mapreduce.QueueInfo;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskReport;
@@ -82,7 +82,7 @@ public class MockSimulatorJobTracker imp
 
   @Override
   public JobStatus submitJob(
-      JobID jobId, String jobSubmitDir, TokenStorage ts) throws IOException {
+      JobID jobId, String jobSubmitDir, Credentials ts) throws IOException {
     JobStatus status = new JobStatus(jobId, 0.0f, 0.0f, 0.0f, 0.0f,
         JobStatus.State.RUNNING, JobPriority.NORMAL, "", "", "", "");
     return status;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Fri Jul  9 20:27:12 2010
@@ -34,7 +34,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -78,11 +78,11 @@ class Child {
     //load token cache storage
     String jobTokenFile = 
       System.getenv().get(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-    TokenStorage ts = 
-      TokenCache.loadTaskTokenStorage(jobTokenFile, defaultConf);
-    LOG.debug("loading token. # keys =" +ts.numberOfSecretKeys() + 
+    Credentials credentials = 
+      TokenCache.loadTokens(jobTokenFile, defaultConf);
+    LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() + 
         "; from file=" + jobTokenFile);
-    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
+    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
     jt.setService(new Text(address.getAddress().getHostAddress() + ":"
         + address.getPort()));
     UserGroupInformation current = UserGroupInformation.getCurrentUser();
@@ -93,6 +93,9 @@ class Child {
      = UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
     taskOwner.addToken(jt);
     
+    // Set the credentials
+    defaultConf.setCredentials(credentials);
+    
     final TaskUmbilicalProtocol umbilical = 
       taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
       @Override
@@ -177,7 +180,10 @@ class Child {
         //create the index file so that the log files 
         //are viewable immediately
         TaskLog.syncLogs(logLocation, taskid, isCleanup);
+        
+        // Create the job-conf and set credentials
         final JobConf job = new JobConf(task.getJobFile());
+        job.setCredentials(defaultConf.getCredentials());
         
         // set the jobTokenFile into task
         task.setJobTokenSecret(JobTokenSecretManager.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java Fri Jul  9 20:27:12 2010
@@ -187,7 +187,7 @@ public abstract class FileInputFormat<K,
     }
 
     // get tokens for all the required FileSystems..
-    TokenCache.obtainTokensForNamenodes(dirs, job);
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
     
     // Whether we need to recursive look into the directory structure
     boolean recursive = job.getBoolean("mapred.input.dir.recursive", false);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java Fri Jul  9 20:27:12 2010
@@ -118,7 +118,8 @@ public abstract class FileOutputFormat<K
       setOutputPath(job, outDir);
       
       // get delegation token for the outDir's file system
-      TokenCache.obtainTokensForNamenodes(new Path[] {outDir}, job);
+      TokenCache.obtainTokensForNamenodes(job.getCredentials(), 
+                                          new Path[] {outDir}, job);
       
       // check its existence
       if (fs.exists(outDir)) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Fri Jul  9 20:27:12 2010
@@ -47,6 +47,7 @@ import org.apache.hadoop.mapred.lib.KeyF
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.log4j.Level;
@@ -322,6 +323,8 @@ public class JobConf extends Configurati
    */
   public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
 
+  private Credentials credentials = new Credentials();
+  
   /**
    * Configuration key to set the logging {@link Level} for the map task.
    *
@@ -369,6 +372,12 @@ public class JobConf extends Configurati
    */
   public JobConf(Configuration conf) {
     super(conf);
+    
+    if (conf instanceof JobConf) {
+      JobConf that = (JobConf)conf;
+      credentials = that.credentials;
+    }
+    
     checkAndWarnDeprecation();
   }
 
@@ -416,6 +425,18 @@ public class JobConf extends Configurati
   }
 
   /**
+   * Get credentials for the job.
+   * @return credentials for the job
+   */
+  public Credentials getCredentials() {
+    return credentials;
+  }
+  
+  void setCredentials(Credentials credentials) {
+    this.credentials = credentials;
+  }
+  
+  /**
    * Get the user jar for the map-reduce job.
    * 
    * @return the user jar for the map-reduce job.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Jul  9 20:27:12 2010
@@ -71,7 +71,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -157,7 +157,7 @@ public class JobInProgress {
   JobPriority priority = JobPriority.NORMAL;
   protected JobTracker jobtracker;
   
-  protected TokenStorage tokenStorage;
+  protected Credentials tokenStorage;
   
   JobHistory jobHistory;
 
@@ -390,7 +390,7 @@ public class JobInProgress {
   public JobInProgress(JobTracker jobtracker, 
                        final JobConf default_conf, int rCount,
                        JobInfo jobInfo,
-                       TokenStorage ts
+                       Credentials ts
                       ) throws IOException, InterruptedException {
     this.restartCount = rCount;
     this.jobId = JobID.downgrade(jobInfo.getJobID());
@@ -3695,7 +3695,7 @@ public class JobInProgress {
     
     // add this token to the tokenStorage
     if(tokenStorage == null)
-      tokenStorage = new TokenStorage();
+      tokenStorage = new Credentials();
     
     TokenCache.setJobToken(token, tokenStorage);
     

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Jul  9 20:27:12 2010
@@ -101,7 +101,7 @@ import org.apache.hadoop.net.ScriptBased
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -2991,7 +2991,7 @@ public class JobTracker implements MRCon
   public synchronized 
   org.apache.hadoop.mapreduce.JobStatus 
     submitJob(org.apache.hadoop.mapreduce.JobID jobId, String jobSubmitDir,
-              TokenStorage ts
+              Credentials ts
               ) throws IOException, InterruptedException {
     return submitJob(JobID.downgrade(jobId), jobSubmitDir, ts);
   }
@@ -3004,12 +3004,11 @@ public class JobTracker implements MRCon
    * of the JobTracker.  But JobInProgress adds info that's useful for
    * the JobTracker alone.
    * @deprecated Use 
-   * {@link #submitJob(org.apache.hadoop.mapreduce.JobID, String, TokenStorage)}
+   * {@link #submitJob(org.apache.hadoop.mapreduce.JobID, String, Credentials)}
    *  instead
    */
   @Deprecated
-  public JobStatus submitJob(JobID jobId, String jobSubmitDir,
-			     TokenStorage ts)
+  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
       throws IOException, InterruptedException {
 
     return submitJob(jobId, 0, UserGroupInformation.getCurrentUser(), 
@@ -3021,7 +3020,7 @@ public class JobTracker implements MRCon
    */
   private JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobID, 
 			      int restartCount, UserGroupInformation ugi, 
-			      String jobSubmitDir, boolean recovered, TokenStorage ts
+			      String jobSubmitDir, boolean recovered, Credentials ts
 			      )
       throws IOException, InterruptedException {
 
@@ -3393,9 +3392,11 @@ public class JobTracker implements MRCon
   }
 
   /**
-   * @see ClientProtocol#setJobPriority(org.apache.hadoop.mapreduce.JobID, String)
+   * Set the priority of a job
+   * @param jobid
+   * @param priority
+   * @throws IOException
    */
-  @Override
   public synchronized void setJobPriority(org.apache.hadoop.mapreduce.JobID 
       jobid, String priority) throws IOException {
     setJobPriority(JobID.downgrade(jobid), priority);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Jul  9 20:27:12 2010
@@ -53,7 +53,7 @@ import org.apache.hadoop.mapreduce.filec
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.State;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
@@ -587,10 +587,12 @@ public class LocalJobRunner implements C
   }
 
   public org.apache.hadoop.mapreduce.JobStatus submitJob(
-      org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir, TokenStorage ts) 
-      throws IOException {
-    TokenCache.setTokenStorage(ts);
-    return new Job(JobID.downgrade(jobid), jobSubmitDir).status;
+      org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
+      Credentials credentials) throws IOException {
+    Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);
+    job.job.setCredentials(credentials);
+    return job.status;
+
   }
 
   public void killJob(org.apache.hadoop.mapreduce.JobID id) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri Jul  9 20:27:12 2010
@@ -61,6 +61,7 @@ import org.apache.hadoop.mapreduce.lib.r
 import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -592,7 +593,7 @@ abstract public class Task implements Wr
       } else {
         return split;
       }
-    }    
+    }  
     /** 
      * The communication thread handles communication with the parent (Task Tracker). 
      * It sends progress updates if progress has been made or if the task needs to 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Jul  9 20:27:12 2010
@@ -82,7 +82,7 @@ import org.apache.hadoop.mapreduce.TaskT
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
@@ -1036,7 +1036,7 @@ public class TaskTracker 
     String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
     rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
 
-    TokenStorage ts = TokenCache.loadTokens(localJobTokenFile, fConf);
+    Credentials ts = TokenCache.loadTokens(localJobTokenFile, fConf);
     Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
     if (jt != null) { //could be null in the case of some unit tests
       getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Fri Jul  9 20:27:12 2010
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.security.Credentials;
 
 /**
  * A read-only view of the job that is provided to the tasks while they
@@ -43,6 +44,12 @@ public interface JobContext extends MRJo
   public Configuration getConfiguration();
 
   /**
+   * Get credentials for the job.
+   * @return credentials for the job
+   */
+  public Credentials getCredentials();
+
+  /**
    * Get the unique ID for the job.
    * @return the object with the job id
    */

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Jul  9 20:27:12 2010
@@ -45,6 +45,7 @@ import org.apache.hadoop.mapreduce.filec
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.JsonMappingException;
@@ -240,7 +241,8 @@ class JobSubmitter {
     //  set the public/private visibility of the archives and files
     TrackerDistributedCacheManager.determineCacheVisibilities(conf);
     // get DelegationToken for each cached file
-    TrackerDistributedCacheManager.getDelegationTokens(conf);
+    TrackerDistributedCacheManager.getDelegationTokens(conf, job
+        .getCredentials());
   }
   
   private URI getPathURI(Path destPath, String fragment) 
@@ -335,7 +337,8 @@ class JobSubmitter {
       LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
           + " as the submit dir");
       // get delegation token for the dir
-      TokenCache.obtainTokensForNamenodes(new Path [] {submitJobDir}, conf);
+      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+          new Path[] { submitJobDir }, conf);
       
       copyAndConfigureFiles(job, submitJobDir);
       Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
@@ -354,9 +357,9 @@ class JobSubmitter {
       //
       // Now, actually submit the job (using the submit name)
       //
-      populateTokenCache(conf);
+      populateTokenCache(conf, job.getCredentials());
       status = submitClient.submitJob(
-          jobId, submitJobDir.toString(), TokenCache.getTokenStorage());
+          jobId, submitJobDir.toString(), job.getCredentials());
       if (status != null) {
         return status;
       } else {
@@ -484,7 +487,8 @@ class JobSubmitter {
   
   // get secret keys and tokens and store them into TokenCache
   @SuppressWarnings("unchecked")
-  private void populateTokenCache(Configuration conf) throws IOException{
+  private void populateTokenCache(Configuration conf, Credentials credentials)
+      throws IOException {
     // create TokenStorage object with user secretKeys
     String tokensFileName = conf.get("tokenCacheFile");
     if(tokensFileName != null) {
@@ -499,7 +503,8 @@ class JobSubmitter {
           mapper.readValue(new File(localFileName), Map.class);
 
         for(Map.Entry<String, String> ent: nm.entrySet()) {
-          TokenCache.addSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
+          credentials.addSecretKey(new Text(ent.getKey()), ent.getValue()
+              .getBytes());
         }
       } catch (JsonMappingException e) {
         json_error = true;
@@ -518,7 +523,7 @@ class JobSubmitter {
       for(int i=0; i< nameNodes.length; i++) {
         ps[i] = new Path(nameNodes[i]);
       }
-      TokenCache.obtainTokensForNamenodes(ps, conf);
+      TokenCache.obtainTokensForNamenodes(credentials, ps, conf);
     }
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Fri Jul  9 20:27:12 2010
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.LocalFileSys
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.classification.InterfaceAudience;
 
@@ -694,9 +695,11 @@ public class TrackerDistributedCacheMana
   /**
    * For each archive or cache file - get the corresponding delegation token
    * @param job
+   * @param credentials
    * @throws IOException
    */
-  public static void getDelegationTokens(Configuration job) throws IOException {
+  public static void getDelegationTokens(Configuration job,
+      Credentials credentials) throws IOException {
     URI[] tarchives = DistributedCache.getCacheArchives(job);
     URI[] tfiles = DistributedCache.getCacheFiles(job);
     
@@ -716,7 +719,7 @@ public class TrackerDistributedCacheMana
       }
     }
     
-    TokenCache.obtainTokensForNamenodes(ps, job);
+    TokenCache.obtainTokensForNamenodes(credentials, ps, job);
   }
   
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java Fri Jul  9 20:27:12 2010
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.security.Credentials;
 
 /**
  * A simple wrapper class that delegates most of its functionality to the
@@ -305,4 +306,9 @@ class ChainMapContextImpl<KEYIN, VALUEIN
     base.progress();
   }
 
+  @Override
+  public Credentials getCredentials() {
+    return base.getCredentials();
+  }
+
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java Fri Jul  9 20:27:12 2010
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.ReduceContext;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
 
 /**
  * A simple wrapper class that delegates most of its functionality to the
@@ -297,4 +298,9 @@ class ChainReduceContextImpl<KEYIN, VALU
   public void progress() {
     base.progress();
   }
+
+  @Override
+  public Credentials getCredentials() {
+    return base.getCredentials();
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Fri Jul  9 20:27:12 2010
@@ -205,7 +205,8 @@ public abstract class FileInputFormat<K,
     }
     
     // get tokens for all the required FileSystems..
-    TokenCache.obtainTokensForNamenodes(dirs, job.getConfiguration());
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
+                                        job.getConfiguration());
 
     List<IOException> errors = new ArrayList<IOException>();
     

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java Fri Jul  9 20:27:12 2010
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
 
 /**
  * A {@link Mapper} which wraps a given one to allow custom 
@@ -306,5 +307,10 @@ public class WrappedMapper<KEYIN, VALUEI
     public String getUser() {
       return mapContext.getUser();
     }
+
+    @Override
+    public Credentials getCredentials() {
+      return mapContext.getCredentials();
+    }
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java Fri Jul  9 20:27:12 2010
@@ -134,8 +134,9 @@ public static final String OUTDIR = "map
     }
 
     // get delegation token for outDir's file system
-    TokenCache.obtainTokensForNamenodes(new Path[] {outDir}, job.getConfiguration());
-    
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+        new Path[] { outDir }, job.getConfiguration());
+
     if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
       throw new FileAlreadyExistsException("Output directory " + outDir + 
                                            " already exists");

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java Fri Jul  9 20:27:12 2010
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.Parti
 import org.apache.hadoop.mapreduce.ReduceContext;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
 
 /**
  * A {@link Reducer} which wraps a given one to allow for custom 
@@ -310,5 +311,10 @@ public class WrappedReducer<KEYIN, VALUE
     public String getUser() {
       return reduceContext.getUser();
     }
+
+    @Override
+    public Credentials getCredentials() {
+      return reduceContext.getCredentials();
+    }
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Fri Jul  9 20:27:12 2010
@@ -39,8 +39,8 @@ import org.apache.hadoop.mapreduce.TaskT
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.State;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.KerberosInfo;
-import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenInfo;
 
@@ -107,8 +107,9 @@ public interface ClientProtocol extends 
    * Version 31: Added TokenStorage to submitJob      
    * Version 32: Added delegation tokens (add, renew, cancel)
    * Version 33: Added JobACLs to JobStatus as part of MAPREDUCE-1307
+   * Version 34: Modified submitJob to use Credentials instead of TokenStorage.
    */
-  public static final long versionID = 33L;
+  public static final long versionID = 34L;
 
   /**
    * Allocate a name for the job.
@@ -121,8 +122,8 @@ public interface ClientProtocol extends 
    * Submit a Job for execution.  Returns the latest profile for
    * that job.
    */
-  public JobStatus submitJob(JobID jobId, String jobSubmitDir, TokenStorage ts) 
-  throws IOException, InterruptedException;
+  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
+      throws IOException, InterruptedException;
 
   /**
    * Get the current status of the cluster

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Jul  9 20:27:12 2010
@@ -39,7 +39,7 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -57,60 +57,36 @@ public class TokenCache {
   
   private static final Log LOG = LogFactory.getLog(TokenCache.class);
 
-  private static TokenStorage tokenStorage;
   
   /**
    * auxiliary method to get user's secret keys..
    * @param alias
    * @return secret key from the storage
    */
-  public static byte[] getSecretKey(Text alias) {
-    if(tokenStorage == null)
+  public static byte[] getSecretKey(Credentials credentials, Text alias) {
+    if(credentials == null)
       return null;
-    return tokenStorage.getSecretKey(alias);
+    return credentials.getSecretKey(alias);
   }
   
   /**
-   * auxiliary methods to store user'  s secret keys
-   * @param alias
-   * @param key
-   */
-  public static void addSecretKey(Text alias, byte[] key) {
-    getTokenStorage().addSecretKey(alias, key);
-  }
-  
-  /**
-   * auxiliary method to add a delegation token
-   */
-  public static void addDelegationToken(
-      String namenode, Token<? extends TokenIdentifier> t) {
-    getTokenStorage().addToken(new Text(namenode), t);
-  }
-
-  /**
-   * auxiliary method 
-   * @return all the available tokens
-   */
-  public static Collection<Token<? extends TokenIdentifier>> getAllTokens() {
-    return getTokenStorage().getAllTokens();
-  }
-  /**
    * Convenience method to obtain delegation tokens from namenodes 
    * corresponding to the paths passed.
+   * @param credentials
    * @param ps array of paths
    * @param conf configuration
    * @throws IOException
    */
-  public static void obtainTokensForNamenodes(Path [] ps, Configuration conf) 
-  throws IOException {
+  public static void obtainTokensForNamenodes(Credentials credentials,
+      Path[] ps, Configuration conf) throws IOException {
     if (!UserGroupInformation.isSecurityEnabled()) {
       return;
     }
-    obtainTokensForNamenodesInternal(ps, conf);
+    obtainTokensForNamenodesInternal(credentials, ps, conf);
   }
     
-  static void obtainTokensForNamenodesInternal(Path [] ps, Configuration conf)
-  throws IOException {
+  static void obtainTokensForNamenodesInternal(Credentials credentials,
+      Path[] ps, Configuration conf) throws IOException {
     // get jobtracker principal id (for the renewer)
     Text jtCreds = new Text(conf.get(JTConfig.JT_USER_NAME, ""));
     
@@ -123,7 +99,7 @@ public class TokenCache {
         
         // see if we already have the token
         Token<DelegationTokenIdentifier> token = 
-          TokenCache.getDelegationToken(fs_addr); 
+          TokenCache.getDelegationToken(credentials, fs_addr); 
         if(token != null) {
           LOG.debug("DT for " + token.getService()  + " is already present");
           continue;
@@ -134,7 +110,7 @@ public class TokenCache {
           throw new IOException("Token from " + fs_addr + " is null");
 
         token.setService(new Text(fs_addr));
-        TokenCache.addDelegationToken(fs_addr, token);
+        credentials.addToken(new Text(fs_addr), token);
         LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr + 
             ";t.service="+token.getService());
       }
@@ -161,50 +137,10 @@ public class TokenCache {
    */
   @SuppressWarnings("unchecked")
   @InterfaceAudience.Private
-  public static Token<DelegationTokenIdentifier> 
-  getDelegationToken(String namenode) {
-    return (Token<DelegationTokenIdentifier>)getTokenStorage().
-            getToken(new Text(namenode));
-  }
-
-  /**
-   * @return TokenStore object
-   */
-  @InterfaceAudience.Private
-  public static TokenStorage getTokenStorage() {
-    if(tokenStorage==null)
-      tokenStorage = new TokenStorage();
-    
-    return tokenStorage;
-  }
-  
-  /**
-   * sets TokenStorage
-   * @param ts
-   */
-  @InterfaceAudience.Private
-  public static void setTokenStorage(TokenStorage ts) {
-    if(tokenStorage != null)
-      LOG.warn("Overwriting existing token storage with # keys=" + 
-          tokenStorage.numberOfSecretKeys());
-    tokenStorage = ts;
-  }
-  
-  /**
-   * load token storage and stores it
-   * @param conf
-   * @return Loaded TokenStorage object
-   * @throws IOException
-   */
-  @InterfaceAudience.Private
-  public static TokenStorage loadTaskTokenStorage(String fileName, JobConf conf)
-  throws IOException {
-    if(tokenStorage != null)
-      return tokenStorage;
-    
-    tokenStorage = loadTokens(fileName, conf);
-    
-    return tokenStorage;
+  public static Token<DelegationTokenIdentifier> getDelegationToken(
+      Credentials credentials, String namenode) {
+    return (Token<DelegationTokenIdentifier>) credentials.getToken(new Text(
+        namenode));
   }
   
   /**
@@ -213,13 +149,13 @@ public class TokenCache {
    * @throws IOException
    */
   @InterfaceAudience.Private
-  public static TokenStorage loadTokens(String jobTokenFile, JobConf conf) 
+  public static Credentials loadTokens(String jobTokenFile, JobConf conf) 
   throws IOException {
     Path localJobTokenFile = new Path (jobTokenFile);
     FileSystem localFS = FileSystem.getLocal(conf);
     FSDataInputStream in = localFS.open(localJobTokenFile);
     
-    TokenStorage ts = new TokenStorage();
+    Credentials ts = new Credentials();
     ts.readFields(in);
 
     if(LOG.isDebugEnabled()) {
@@ -235,8 +171,8 @@ public class TokenCache {
    */
   @InterfaceAudience.Private
   public static void setJobToken(Token<? extends TokenIdentifier> t, 
-      TokenStorage ts) {
-    ts.addToken(JOB_TOKEN, t);
+      Credentials credentials) {
+    credentials.addToken(JOB_TOKEN, t);
   }
   /**
    * 
@@ -244,8 +180,8 @@ public class TokenCache {
    */
   @SuppressWarnings("unchecked")
   @InterfaceAudience.Private
-  public static Token<JobTokenIdentifier> getJobToken(TokenStorage ts) {
-    return (Token<JobTokenIdentifier>) ts.getToken(JOB_TOKEN);
+  public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
+    return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
   }
   
   static String buildDTServiceName(URI uri) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java Fri Jul  9 20:27:12 2010
@@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -122,7 +122,7 @@ public class DelegationTokenRenewal {
   
   @SuppressWarnings("unchecked")
   public static synchronized void registerDelegationTokensForRenewal(
-      JobID jobId, TokenStorage ts, Configuration conf) {
+      JobID jobId, Credentials ts, Configuration conf) {
     if(ts==null)
       return; //nothing to add
     

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java Fri Jul  9 20:27:12 2010
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.filec
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -55,10 +56,12 @@ public class JobContextImpl implements J
    * The UserGroupInformation object that has a reference to the current user
    */
   protected UserGroupInformation ugi;
+  protected final Credentials credentials;
   
   public JobContextImpl(Configuration conf, JobID jobId) {
     this.conf = new org.apache.hadoop.mapred.JobConf(conf);
     this.jobId = jobId;
+    this.credentials = this.conf.getCredentials();
     try {
       this.ugi = UserGroupInformation.getCurrentUser();
     } catch (IOException e) {
@@ -404,5 +407,9 @@ public class JobContextImpl implements J
   public String getUser() {
     return conf.getUser();
   }
+
+  public Credentials getCredentials() {
+    return credentials;
+  }
   
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Jul  9 20:27:12 2010
@@ -46,8 +46,9 @@ import org.apache.hadoop.mapred.MiniMRCl
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.StringUtils;
@@ -56,6 +57,7 @@ import org.codehaus.jackson.map.ObjectMa
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.Assert;
 
 public class TestTokenCache {
   private static final int NUM_OF_KEYS = 10;
@@ -69,22 +71,13 @@ public class TestTokenCache {
     public void map(IntWritable key, IntWritable value, Context context)
     throws IOException, InterruptedException {
       // get token storage and a key
-      TokenStorage ts = TokenCache.getTokenStorage();
-      byte[] key1 = TokenCache.getSecretKey(new Text("alias1"));
-      Collection<Token<? extends TokenIdentifier>> dts = TokenCache.getAllTokens();
+      Credentials ts = context.getCredentials();
+      byte[] key1 = ts.getSecretKey(new Text("alias1"));
+      Collection<Token<? extends TokenIdentifier>> dts = ts.getAllTokens();
       int dts_size = 0;
       if(dts != null)
         dts_size = dts.size();
       
-      System.out.println("inside MAP: ts==NULL?=" + (ts==null) + 
-          "; #keys = " + (ts==null? 0:ts.numberOfSecretKeys()) + 
-          ";jobToken = " +  (ts==null? "n/a":TokenCache.getJobToken(ts)) +
-          "; alias1 key=" + new String(key1) + 
-          "; dts size= " + dts_size);
-    
-      for(Token<? extends TokenIdentifier> t : dts) {
-        System.out.println(t.getKind() + "=" + StringUtils.byteToHexString(t.getPassword()));
-      }
       
       if(dts.size() != 2) { // one job token and one delegation token
         throw new RuntimeException("tokens are not available"); // fail the test
@@ -109,8 +102,29 @@ public class TestTokenCache {
           reduceSleepTime, reduceSleepCount);
       
       job.setMapperClass(MySleepMapper.class);
+      //Populate tokens here because security is disabled.
+      populateTokens(job);
       return job;
     }
+    
+    private void populateTokens(Job job) {
+    // Credentials in the job will not have delegation tokens
+    // because security is disabled. Fetch delegation tokens
+    // and populate the credential in the job.
+      try {
+        Credentials ts = job.getCredentials();
+        Path p1 = new Path("file1");
+        p1 = p1.getFileSystem(job.getConfiguration()).makeQualified(p1);
+        Credentials cred = new Credentials();
+        TokenCache.obtainTokensForNamenodesInternal(cred, new Path[] { p1 },
+            job.getConfiguration());
+        for (Token<? extends TokenIdentifier> t : cred.getAllTokens()) {
+          ts.addToken(new Text("Hdfs"), t);
+        }
+      } catch (IOException e) {
+        Assert.fail("Exception " + e);
+      }
+    }
   }
   
   private static MiniMRCluster mrCluster;
@@ -142,9 +156,6 @@ public class TestTokenCache {
     p2 = new Path("file2");
     
     p1 = fs.makeQualified(p1);
-    // do not qualify p2
-    TokenCache.setTokenStorage(new TokenStorage());
-    TokenCache.obtainTokensForNamenodesInternal(new Path [] {p1, p2}, jConf);
   }
 
   @AfterClass
@@ -173,7 +184,6 @@ public class TestTokenCache {
       throw new IOException(e);
     }
     
-    System.out.println("writing secret keys into " + tokenFileName);
     try {
       File p  = new File(tokenFileName.getParent().toString());
       p.mkdirs();
@@ -190,8 +200,6 @@ public class TestTokenCache {
     Map<String, String> map;
     map = mapper.readValue(new File(tokenFileName.toString()), Map.class);
     assertEquals("didn't read JSON correctly", map.size(), NUM_OF_KEYS);
-    
-    System.out.println("file " + tokenFileName + " verified; size="+ map.size());
   }
   
   /**
@@ -257,20 +265,23 @@ public class TestTokenCache {
   public void testGetTokensForNamenodes() throws IOException {
     FileSystem fs = dfsCluster.getFileSystem();
     
+    Credentials credentials = new Credentials();
+    TokenCache.obtainTokensForNamenodesInternal(credentials, new Path[] { p1,
+        p2 }, jConf);
+
     // this token is keyed by hostname:port key.
     String fs_addr = TokenCache.buildDTServiceName(p1.toUri());
-    Token<DelegationTokenIdentifier> nnt = TokenCache.getDelegationToken(fs_addr);
+    Token<DelegationTokenIdentifier> nnt = TokenCache.getDelegationToken(
+        credentials, fs_addr);
     System.out.println("dt for " + p1 + "(" + fs_addr + ")" + " = " +  nnt);
     assertNotNull("Token for nn is null", nnt);
     
     // verify the size
-    Collection<Token<? extends TokenIdentifier>> tns = TokenCache.getAllTokens();
+    Collection<Token<? extends TokenIdentifier>> tns = credentials.getAllTokens();
     assertEquals("number of tokens is not 1", 1, tns.size());
     
     boolean found = false;
     for(Token<? extends TokenIdentifier> t: tns) {
-      System.out.println("kind="+t.getKind() + ";servic=" + t.getService() + ";str=" + t.toString());
-      
       if(t.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) &&
           t.getService().equals(new Text(fs_addr))) {
         found = true;

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCacheOldApi.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCacheOldApi.java?rev=962682&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCacheOldApi.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCacheOldApi.java Fri Jul  9 20:27:12 2010
@@ -0,0 +1,296 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.EmptyInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+@SuppressWarnings("deprecation")
+public class TestTokenCacheOldApi {
+  private static final int NUM_OF_KEYS = 10;
+
+  // my sleep class - adds check for tokenCache
+  static class MyDummyJob extends Configured implements Tool,
+      Mapper<IntWritable, IntWritable, IntWritable, NullWritable>,
+      Reducer<IntWritable, NullWritable, NullWritable, NullWritable>,
+      Partitioner<IntWritable, NullWritable> {
+    Credentials ts;
+
+    public void configure(JobConf job) {
+    }
+    
+    /**
+     * attempts to access tokenCache as from client
+     */
+    public void map(IntWritable key, IntWritable value,
+        OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
+        throws IOException {
+      // get token storage and a key
+      byte[] key1 = ts.getSecretKey(new Text("alias1"));
+      Collection<Token<? extends TokenIdentifier>> dts = ts.getAllTokens();
+      int dts_size = 0;
+      if(dts != null)
+        dts_size = dts.size();
+
+      if(dts.size() != 2) { // one job token and one delegation token
+        throw new RuntimeException("tokens are not available"); // fail the test
+      }
+
+      if(key1 == null || ts == null || ts.numberOfSecretKeys() != NUM_OF_KEYS) {
+        throw new RuntimeException("secret keys are not available"); // fail the test
+      }
+      
+      output.collect(new IntWritable(1), NullWritable.get());
+    }
+    
+    public JobConf setupJobConf() {
+      
+      JobConf job = new JobConf(getConf(), MyDummyJob.class);
+      job.setNumMapTasks(1);
+      job.setNumReduceTasks(1);
+      job.setMapperClass(MyDummyJob.class);
+      job.setMapOutputKeyClass(IntWritable.class);
+      job.setMapOutputValueClass(NullWritable.class);
+      job.setReducerClass(MyDummyJob.class);
+      job.setOutputFormat(NullOutputFormat.class);
+      job.setInputFormat(EmptyInputFormat.class);
+      job.setPartitionerClass(MyDummyJob.class);
+      job.setSpeculativeExecution(false);
+      job.setJobName("Sleep job");
+      populateTokens(job);
+      return job;
+    }
+    
+    private void populateTokens(JobConf job) {
+      // Credentials in the job will not have delegation tokens
+      // because security is disabled. Fetch delegation tokens
+      // and populate the credential in the job.
+      try {
+        Credentials ts = job.getCredentials();
+        Path p1 = new Path("file1");
+        p1 = p1.getFileSystem(job).makeQualified(p1);
+        Credentials cred = new Credentials();
+        TokenCache.obtainTokensForNamenodesInternal(cred, new Path[] { p1 },
+            job);
+        for (Token<? extends TokenIdentifier> t : cred.getAllTokens()) {
+          ts.addToken(new Text("Hdfs"), t);
+        }
+      } catch (IOException e) {
+        Assert.fail("Exception " + e);
+      }
+    }
+
+    public void close() throws IOException {
+    }
+
+    public void reduce(IntWritable key, Iterator<NullWritable> values,
+        OutputCollector<NullWritable, NullWritable> output, Reporter reporter)
+        throws IOException {
+      return;
+    }
+
+    public int getPartition(IntWritable key, NullWritable value,
+        int numPartitions) {
+      return key.get() % numPartitions;
+    }
+
+    public int run(String[] args) throws Exception {
+      JobConf job = setupJobConf();
+      JobClient.runJob(job);
+      return 0;
+    }
+  }
+  
+  private static MiniMRCluster mrCluster;
+  private static MiniDFSCluster dfsCluster;
+  private static final Path TEST_DIR = 
+    new Path(System.getProperty("test.build.data","/tmp"), "sleepTest");
+  private static final Path tokenFileName = new Path(TEST_DIR, "tokenFile.json");
+  private static int numSlaves = 1;
+  private static JobConf jConf;
+  private static ObjectMapper mapper = new ObjectMapper();
+  private static Path p1;
+  private static Path p2;
+  
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
+    jConf = new JobConf(conf);
+    mrCluster = new MiniMRCluster(0, 0, numSlaves, 
+        dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null, 
+        jConf);
+    
+    createTokenFileJson();
+    verifySecretKeysInJSONFile();
+    dfsCluster.getNamesystem()
+				.getDelegationTokenSecretManager().startThreads();
+    FileSystem fs = dfsCluster.getFileSystem();
+    
+    p1 = new Path("file1");
+    p2 = new Path("file2");
+    p1 = fs.makeQualified(p1);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if(mrCluster != null)
+      mrCluster.shutdown();
+    mrCluster = null;
+    if(dfsCluster != null)
+      dfsCluster.shutdown();
+    dfsCluster = null;
+  }
+
+  // create jason file and put some keys into it..
+  private static void createTokenFileJson() throws IOException {
+    Map<String, String> map = new HashMap<String, String>();
+    
+    try {
+      KeyGenerator kg = KeyGenerator.getInstance("HmacSHA1");
+      for(int i=0; i<NUM_OF_KEYS; i++) {
+        SecretKeySpec key = (SecretKeySpec) kg.generateKey();
+        byte [] enc_key = key.getEncoded();
+        map.put("alias"+i, new String(Base64.encodeBase64(enc_key)));
+
+      }
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException(e);
+    }
+    
+    try {
+      File p  = new File(tokenFileName.getParent().toString());
+      p.mkdirs();
+      // convert to JSON and save to the file
+      mapper.writeValue(new File(tokenFileName.toString()), map);
+
+    } catch (Exception e) {
+      System.out.println("failed with :" + e.getLocalizedMessage());
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private static void verifySecretKeysInJSONFile() throws IOException {
+    Map<String, String> map;
+    map = mapper.readValue(new File(tokenFileName.toString()), Map.class);
+    assertEquals("didn't read JSON correctly", map.size(), NUM_OF_KEYS);
+  }
+  
+  /**
+   * run a distributed job and verify that TokenCache is available
+   * @throws IOException
+   */
+  @Test
+  public void testTokenCache() throws IOException {
+    // make sure JT starts
+    jConf = mrCluster.createJobConf();
+    
+    // provide namenodes names for the job to get the delegation tokens for
+    //String nnUri = dfsCluster.getNameNode().getUri(namenode).toString();
+    NameNode nn = dfsCluster.getNameNode();
+    URI nnUri = NameNode.getUri(nn.getNameNodeAddress());
+    jConf.set(JobContext.JOB_NAMENODES, nnUri + "," + nnUri.toString());
+    // job tracker principle id..
+    jConf.set(JobTracker.JT_USER_NAME, "jt_id");
+
+    // using argument to pass the file name
+    String[] args = {
+       "-tokenCacheFile", tokenFileName.toString(), 
+        "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
+        };
+     
+    int res = -1;
+    try {
+      res = ToolRunner.run(jConf, new MyDummyJob(), args);
+    } catch (Exception e) {
+      System.out.println("Job failed with" + e.getLocalizedMessage());
+      e.printStackTrace(System.out);
+      Assert.fail("Job failed");
+    }
+    assertEquals("dist job res is not 0", res, 0);
+  }
+  
+  /**
+   * run a local job and verify that TokenCache is available
+   * @throws NoSuchAlgorithmException
+   * @throws IOException
+   */
+  @Test
+  public void testLocalJobTokenCache() throws NoSuchAlgorithmException, IOException {
+    // this is local job
+    String[] args = {"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"}; 
+    jConf.set("mapreduce.job.credentials.json", tokenFileName.toString());
+
+    int res = -1;
+    try {
+      res = ToolRunner.run(jConf, new MyDummyJob(), args);
+    } catch (Exception e) {
+      System.out.println("Job failed with" + e.getLocalizedMessage());
+      e.printStackTrace(System.out);
+      fail("local Job failed");
+    }
+    assertEquals("local job res is not 0", res, 0);
+  }
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java Fri Jul  9 20:27:12 2010
@@ -38,7 +38,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.StringUtils;
@@ -238,7 +238,7 @@ public class TestDelegationTokenRenewal 
     String nn2 = DelegationTokenRenewal.SCHEME + "://host2:0";
     String nn3 = DelegationTokenRenewal.SCHEME + "://host3:0";
     
-    TokenStorage ts = new TokenStorage();
+    Credentials ts = new Credentials();
     
     // register the token for renewal
     ts.addToken(new Text(nn1), token1);
@@ -273,7 +273,7 @@ public class TestDelegationTokenRenewal 
     // add another token ( that expires in 2 secs). Then remove it, before
     // time is up.
     // Wait for 3 secs , and make sure no renews were called
-    ts = new TokenStorage();
+    ts = new Credentials();
     MyToken token4 = dfs.getDelegationToken(new Text("user4"));
     
     //to cause this one to be set for renew in 2 secs

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=962682&r1=962681&r2=962682&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Fri Jul  9 20:27:12 2010
@@ -739,15 +739,15 @@ public class DistCp implements Tool {
   }
 
   /** Sanity check for srcPath */
-  private static void checkSrcPath(Configuration conf, List<Path> srcPaths
-      ) throws IOException {
+  private static void checkSrcPath(Configuration conf, List<Path> srcPaths,
+      JobConf jobConf) throws IOException {
     List<IOException> rslt = new ArrayList<IOException>();
     List<Path> unglobbed = new LinkedList<Path>();
     
     // get tokens for all the required FileSystems..
     Path[] ps = new Path[srcPaths.size()];
     ps = srcPaths.toArray(ps);
-    TokenCache.obtainTokensForNamenodes(ps, conf);
+    TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, conf);
     
     
     for (Path p : srcPaths) {
@@ -779,9 +779,10 @@ public class DistCp implements Tool {
     if (!args.dryrun || args.flags.contains(Options.UPDATE)) {
       LOG.info("destPath=" + args.dst);
     }
-    checkSrcPath(conf, args.srcs);
 
     JobConf job = createJobConf(conf);
+    
+    checkSrcPath(conf, args.srcs, job);
     if (args.preservedAttributes != null) {
       job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes);
     }
@@ -1230,7 +1231,8 @@ public class DistCp implements Tool {
     FileSystem dstfs = args.dst.getFileSystem(conf);
     
     // get tokens for all the required FileSystems..
-    TokenCache.obtainTokensForNamenodes(new Path[] {args.dst}, conf);
+    TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), 
+                                        new Path[] {args.dst}, conf);
     
     
     boolean dstExists = dstfs.exists(args.dst);