You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:56:24 UTC

svn commit: r1077251 - in /hadoop/common/branches/branch-0.20-security-patches/src: core/org/apache/hadoop/security/ core/org/apache/hadoop/security/authorize/ hdfs/org/apache/hadoop/hdfs/server/namenode/ hdfs/org/apache/hadoop/hdfs/tools/ mapred/org/a...

Author: omalley
Date: Fri Mar  4 03:56:23 2011
New Revision: 1077251

URL: http://svn.apache.org/viewvc?rev=1077251&view=rev
Log:
commit 2839699a7d8d8bb9141cd00291414ff02ddc5168
Author: Arun C Murthy <ac...@apache.org>
Date:   Sat Feb 27 03:26:42 2010 -0800

    MAPREDUCE-1528 from https://issues.apache.org/jira/secure/attachment/12437339/MAPREDUCE-1528_yhadoop20.patch

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/Credentials.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestJobCredentials.java
Removed:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/TokenStorage.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestTokenStorage.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DelegationTokenServlet.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Reporter.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/Credentials.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/Credentials.java?rev=1077251&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/Credentials.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/Credentials.java Fri Mar  4 03:56:23 2011
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A class that provides the facilities of reading and writing 
+ * secret keys and Tokens.
+ */
+public class Credentials implements Writable {
+
+  private  Map<Text, byte[]> secretKeysMap = new HashMap<Text, byte[]>();
+  private  Map<Text, Token<? extends TokenIdentifier>> tokenMap = 
+    new HashMap<Text, Token<? extends TokenIdentifier>>(); 
+
+  /**
+   * Returns the key bytes for the alias
+   * @param alias the alias for the key
+   * @return key for this alias
+   */
+  public byte[] getSecretKey(Text alias) {
+    return secretKeysMap.get(alias);
+  }
+  
+  /**
+   * Returns the Token object for the alias
+   * @param alias the alias for the Token
+   * @return token for this alias
+   */
+  public Token<? extends TokenIdentifier> getToken(Text alias) {
+    return tokenMap.get(alias);
+  }
+  
+  /**
+   * Add a token in the storage (in memory)
+   * @param alias the alias for the key
+   * @param t the token object
+   */
+  public void addToken(Text alias, Token<? extends TokenIdentifier> t) {
+    tokenMap.put(alias, t);
+  }
+  
+  /**
+   * Return all the tokens in the in-memory map
+   */
+  public Collection<Token<? extends TokenIdentifier>> getAllTokens() {
+    return tokenMap.values();
+  }
+  
+  /**
+   * @return number of Tokens in the in-memory map
+   */
+  public int numberOfTokens() {
+    return tokenMap.size();
+  }
+  
+  /**
+   * @return number of keys in the in-memory map
+   */
+  public int numberOfSecretKeys() {
+    return secretKeysMap.size();
+  }
+  
+  /**
+   * Set the key for an alias
+   * @param alias the alias for the key
+   * @param key the key bytes
+   */
+  public void addSecretKey(Text alias, byte[] key) {
+    secretKeysMap.put(alias, key);
+  }
+ 
+  /**
+   * Convenience method for reading a file, and loading the Tokens
+   * therein in the passed UGI
+   * @param filename
+   * @param conf
+   * @param ugi
+   * @throws IOException
+   */
+  public static void readTokensAndLoadInUGI(String filename, Configuration conf, 
+      UserGroupInformation ugi) throws IOException {
+    Path localTokensFile = new Path (filename);
+    FileSystem localFS = FileSystem.getLocal(conf);
+    FSDataInputStream in = localFS.open(localTokensFile);
+    Credentials ts = new Credentials();
+    ts.readFields(in);
+    for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+      ugi.addToken(token);
+    }
+  }
+  /**
+   * Stores all the keys to DataOutput
+   * @param out
+   * @throws IOException
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // write out tokens first
+    WritableUtils.writeVInt(out, tokenMap.size());
+    for(Map.Entry<Text, 
+        Token<? extends TokenIdentifier>> e: tokenMap.entrySet()) {
+      e.getKey().write(out);
+      e.getValue().write(out);
+    }
+    
+    // now write out secret keys
+    WritableUtils.writeVInt(out, secretKeysMap.size());
+    for(Map.Entry<Text, byte[]> e : secretKeysMap.entrySet()) {
+      e.getKey().write(out);
+      WritableUtils.writeCompressedByteArray(out, e.getValue());  
+    }
+  }
+  
+  /**
+   * Loads all the keys
+   * @param in
+   * @throws IOException
+   */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    secretKeysMap.clear();
+    tokenMap.clear();
+    
+    int size = WritableUtils.readVInt(in);
+    for(int i=0; i<size; i++) {
+      Text alias = new Text();
+      alias.readFields(in);
+      Token<? extends TokenIdentifier> t = new Token<TokenIdentifier>();
+      t.readFields(in);
+      tokenMap.put(alias, t);
+    }
+    
+    size = WritableUtils.readVInt(in);
+    for(int i=0; i<size; i++) {
+      Text alias = new Text();
+      alias.readFields(in);
+      byte[] key = WritableUtils.readCompressedByteArray(in);
+      secretKeysMap.put(alias, key);
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java Fri Mar  4 03:56:23 2011
@@ -366,7 +366,7 @@ public class UserGroupInformation {
         loginUser = new UserGroupInformation(login.getSubject());
         String tokenFile = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
         if (tokenFile != null && isSecurityEnabled()) {
-          TokenStorage.readTokensAndLoadInUGI(tokenFile, new Configuration(), loginUser);
+          Credentials.readTokensAndLoadInUGI(tokenFile, new Configuration(), loginUser);
         }
       } catch (LoginException le) {
         throw new IOException("failure to login", le);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java Fri Mar  4 03:56:23 2011
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -41,7 +42,7 @@ public class ServiceAuthorizationManager
    * 
    * @deprecated Use
    *             {@link CommonConfigurationKeys#HADOOP_SECURITY_AUTHORIZATION}
-   *             Instead.
+   *             instead.
    */
   @Deprecated
   public static final String SERVICE_AUTHORIZATION_CONFIG = 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DelegationTokenServlet.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DelegationTokenServlet.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DelegationTokenServlet.java Fri Mar  4 03:56:23 2011
@@ -30,7 +30,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
@@ -73,7 +73,7 @@ public class DelegationTokenServlet exte
           String s = NameNode.getAddress(conf).getAddress().getHostAddress()
                      + ":" + NameNode.getAddress(conf).getPort();
           token.setService(new Text(s));
-          TokenStorage ts = new TokenStorage();
+          Credentials ts = new Credentials();
           ts.addToken(new Text(ugi.getShortUserName()), token);
           ts.write(dosFinal);
           dosFinal.close();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java Fri Mar  4 03:56:23 2011
@@ -33,7 +33,7 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.DelegationTokenServlet;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
@@ -123,7 +123,7 @@ public class DelegationTokenFetcher {
       + ":" + dfs.getUri().getPort();
     token.setService(new Text(nnAddress));
     
-    TokenStorage ts = new TokenStorage();
+    Credentials ts = new Credentials();
     ts.addToken(new Text(shortName), token);
     ts.write(out);
   }
@@ -151,7 +151,7 @@ public class DelegationTokenFetcher {
       URLConnection connection = remoteURL.openConnection();
       
       InputStream in = connection.getInputStream();
-      TokenStorage ts = new TokenStorage();
+      Credentials ts = new Credentials();
       dis = new DataInputStream(in);
       ts.readFields(dis);
       file = new DataOutputStream(new FileOutputStream(filename));

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Fri Mar  4 03:56:23 2011
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 
@@ -743,9 +744,12 @@ public class TrackerDistributedCacheMana
   /**
    * For each archive or cache file - get the corresponding delegation token
    * @param job
+   * @param credentials
    * @throws IOException
    */
-  public static void getDelegationTokens(Configuration job) throws IOException {
+  public static void getDelegationTokens(Configuration job, 
+                                         Credentials credentials) 
+  throws IOException {
     URI[] tarchives = DistributedCache.getCacheArchives(job);
     URI[] tfiles = DistributedCache.getCacheFiles(job);
 
@@ -765,6 +769,6 @@ public class TrackerDistributedCacheMana
       }
     }
 
-    TokenCache.obtainTokensForNamenodes(ps, job);
+    TokenCache.obtainTokensForNamenodes(credentials, ps, job);
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar  4 03:56:23 2011
@@ -37,7 +37,7 @@ import org.apache.hadoop.mapreduce.secur
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
@@ -71,12 +71,12 @@ class Child {
     // file name is passed thru env
     String jobTokenFile = 
       System.getenv().get(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-    TokenStorage ts = 
-      TokenCache.loadTaskTokenStorage(jobTokenFile, defaultConf);
-    LOG.debug("loading token. # keys =" +ts.numberOfSecretKeys() + 
+    Credentials credentials = 
+      TokenCache.loadTokens(jobTokenFile, defaultConf);
+    LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() + 
         "; from file=" + jobTokenFile);
     
-    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
+    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
     jt.setService(new Text(address.getAddress().getHostAddress() + ":"
         + address.getPort()));
     UserGroupInformation current = UserGroupInformation.getCurrentUser();
@@ -86,6 +86,9 @@ class Child {
      = UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
     taskOwner.addToken(jt);
     
+    // Set the credentials
+    defaultConf.setCredentials(credentials);
+    
     final TaskUmbilicalProtocol umbilical = 
       taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
         @Override
@@ -170,7 +173,10 @@ class Child {
         //create the index file so that the log files 
         //are viewable immediately
         TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+        
+        // Create the job-conf and set credentials
         final JobConf job = new JobConf(task.getJobFile());
+        job.setCredentials(defaultConf.getCredentials());
 
         // set the jobTokenFile into task
         task.setJobTokenSecret(JobTokenSecretManager.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java Fri Mar  4 03:56:23 2011
@@ -157,7 +157,7 @@ public abstract class FileInputFormat<K,
     }
 
     // get tokens for all the required FileSystems..
-    TokenCache.obtainTokensForNamenodes(dirs, job);
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
     
     List<FileStatus> result = new ArrayList<FileStatus>();
     List<IOException> errors = new ArrayList<IOException>();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java Fri Mar  4 03:56:23 2011
@@ -109,7 +109,8 @@ public abstract class FileOutputFormat<K
       setOutputPath(job, outDir);
       
       // get delegation token for the outDir's file system
-      TokenCache.obtainTokensForNamenodes(new Path[] {outDir}, job);
+      TokenCache.obtainTokensForNamenodes(job.getCredentials(), 
+                                          new Path[] {outDir}, job);
       
       // check its existence
       if (fs.exists(outDir)) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Mar  4 03:56:23 2011
@@ -70,6 +70,7 @@ import org.apache.hadoop.mapreduce.secur
 import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -661,7 +662,8 @@ public class JobClient extends Configure
     //  set the public/private visibility of the archives and files
     TrackerDistributedCacheManager.determineCacheVisibilities(job);
     // get DelegationTokens for cache files
-    TrackerDistributedCacheManager.getDelegationTokens(job);
+    TrackerDistributedCacheManager.getDelegationTokens(job, 
+                                                       job.getCredentials());
 
     String originalJarPath = job.getJar();
 
@@ -761,8 +763,9 @@ public class JobClient extends Configure
           copyAndConfigureFiles(jobCopy, submitJobDir);
 
           // get delegation token for the dir
-          TokenCache.obtainTokensForNamenodes(new Path [] {submitJobDir},
-              jobCopy);
+          TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
+                                              new Path [] {submitJobDir},
+                                              jobCopy);
 
           Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
           int reduces = jobCopy.getNumReduceTasks();
@@ -801,9 +804,9 @@ public class JobClient extends Configure
           //
           // Now, actually submit the job (using the submit name)
           //
-          populateTokenCache(jobCopy);
+          populateTokenCache(jobCopy, jobCopy.getCredentials());
           status = jobSubmitClient.submitJob(
-              jobId, submitJobDir.toString(), TokenCache.getTokenStorage());
+              jobId, submitJobDir.toString(), jobCopy.getCredentials());
           if (status != null) {
             return new NetworkedJob(status);
           } else {
@@ -1877,7 +1880,8 @@ public class JobClient extends Configure
   
   //get secret keys and tokens and store them into TokenCache
   @SuppressWarnings("unchecked")
-  private void populateTokenCache(Configuration conf) throws IOException{
+  private void populateTokenCache(Configuration conf, Credentials credentials) 
+  throws IOException{
     // create TokenStorage object with user secretKeys
     String tokensFileName = conf.get("tokenCacheFile");
     if(tokensFileName != null) {
@@ -1892,7 +1896,8 @@ public class JobClient extends Configure
           mapper.readValue(new File(localFileName), Map.class);
 
         for(Map.Entry<String, String> ent: nm.entrySet()) {
-          TokenCache.addSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
+          credentials.addSecretKey(new Text(ent.getKey()), 
+                                   ent.getValue().getBytes());
         }
       } catch (JsonMappingException e) {
         json_error = true;
@@ -1911,7 +1916,7 @@ public class JobClient extends Configure
       for(int i=0; i< nameNodes.length; i++) {
         ps[i] = new Path(nameNodes[i]);
       }
-      TokenCache.obtainTokensForNamenodes(ps, conf);
+      TokenCache.obtainTokensForNamenodes(credentials, ps, conf);
     }
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java Fri Mar  4 03:56:23 2011
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.lib.Iden
 import org.apache.hadoop.mapred.lib.HashPartitioner;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 
@@ -314,6 +315,8 @@ public class JobConf extends Configurati
   public static final String MAPRED_REDUCE_TASK_ENV =
     "mapred.reduce.child.env";
 
+  private Credentials credentials = new Credentials();
+  
   /**
    * Construct a map/reduce job configuration.
    */
@@ -338,6 +341,12 @@ public class JobConf extends Configurati
    */
   public JobConf(Configuration conf) {
     super(conf);
+    
+    if (conf instanceof JobConf) {
+      JobConf that = (JobConf)conf;
+      credentials = that.credentials;
+    }
+    
     checkAndWarnDeprecation();
   }
 
@@ -385,6 +394,18 @@ public class JobConf extends Configurati
   }
 
   /**
+   * Get credentials for the job.
+   * @return credentials for the job
+   */
+  public Credentials getCredentials() {
+    return credentials;
+  }
+  
+  void setCredentials(Credentials credentials) {
+    this.credentials = credentials;
+  }
+  
+  /**
    * Get the user jar for the map-reduce job.
    * 
    * @return the user jar for the map-reduce job.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 03:56:23 2011
@@ -60,7 +60,7 @@ import org.apache.hadoop.metrics.Metrics
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -134,7 +134,7 @@ public class JobInProgress {
   JobPriority priority = JobPriority.NORMAL;
   final JobTracker jobtracker;
   
-  protected TokenStorage tokenStorage;
+  protected Credentials tokenStorage;
 
   // NetworkTopology Node to the set of TIPs
   Map<Node, List<TaskInProgress>> nonRunningMapCache;
@@ -335,7 +335,7 @@ public class JobInProgress {
   }
 
   JobInProgress(JobTracker jobtracker, final JobConf default_conf, 
-      JobInfo jobInfo, int rCount, TokenStorage ts) 
+      JobInfo jobInfo, int rCount, Credentials ts) 
   throws IOException, InterruptedException {
     this.restartCount = rCount;
     this.jobId = JobID.downgrade(jobInfo.getJobID());
@@ -3218,7 +3218,7 @@ public class JobInProgress {
     
     // add this token to the tokenStorage
     if(tokenStorage == null)
-      tokenStorage = new TokenStorage();
+      tokenStorage = new Credentials();
 
     TokenCache.setJobToken(token, tokenStorage);
         

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Mar  4 03:56:23 2011
@@ -25,7 +25,7 @@ import org.apache.hadoop.ipc.VersionedPr
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.security.KerberosInfo;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenInfo;
 
@@ -91,7 +91,7 @@ interface JobSubmissionProtocol extends 
    * that job.
    * The job files should be submitted in <b>jobSubmitDir</b>.
    */
-  public JobStatus submitJob(JobID jobName, String jobSubmitDir, TokenStorage ts) 
+  public JobStatus submitJob(JobID jobName, String jobSubmitDir, Credentials ts) 
   throws IOException;
 
   /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:56:23 2011
@@ -108,7 +108,7 @@ import org.apache.hadoop.mapreduce.TaskT
 import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 
 /*******************************************************
  * JobTracker is the central location for submitting and 
@@ -206,7 +206,7 @@ public class JobTracker implements MRCon
   
   private Clock clock;
 
-  private TokenStorage tokenStorage;
+  private Credentials tokenStorage;
   private final JobTokenSecretManager jobTokenSecretManager
     = new JobTokenSecretManager();
 
@@ -3632,7 +3632,7 @@ public class JobTracker implements MRCon
    * of the JobTracker.  But JobInProgress adds info that's useful for
    * the JobTracker alone.
    */
-  public JobStatus submitJob(JobID jobId, String jobSubmitDir, TokenStorage ts)
+  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
       throws IOException {
     JobInfo jobInfo = null;
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@@ -3986,9 +3986,6 @@ public class JobTracker implements MRCon
     }
   }
   
-  /**
-   * @see ClientProtocol#setJobPriority(JobID, String)
-   */
   public synchronized void setJobPriority(JobID jobid, 
                                           String priority)
                                           throws IOException {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar  4 03:56:23 2011
@@ -44,7 +44,7 @@ import org.apache.hadoop.mapreduce.split
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
@@ -409,10 +409,12 @@ class LocalJobRunner implements JobSubmi
     return new JobID("local", ++jobid);
   }
 
-  public JobStatus submitJob(JobID jobid, String jobSubmitDir, TokenStorage ts) 
+  public JobStatus submitJob(JobID jobid, String jobSubmitDir, 
+                             Credentials credentials) 
   throws IOException {
-    TokenCache.setTokenStorage(ts);
-    return new Job(jobid, jobSubmitDir).status;
+    Job job = new Job(jobid, jobSubmitDir);
+    job.job.setCredentials(credentials);
+    return job.status;
   }
 
   public void killJob(JobID id) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Reporter.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Reporter.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Reporter.java Fri Mar  4 03:56:23 2011
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.Progressable;
 
 /** 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar  4 03:56:23 2011
@@ -50,6 +50,7 @@ import org.apache.hadoop.io.serializer.S
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -579,7 +580,7 @@ abstract public class Task implements Wr
       } else {
         return split;
       }
-    }    
+    }  
     /** 
      * The communication thread handles communication with the parent (Task Tracker). 
      * It sends progress updates if progress has been made or if the task needs to 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 03:56:23 2011
@@ -103,7 +103,7 @@ import org.apache.hadoop.util.VersionInf
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 
 /*******************************************************
  * TaskTracker is a process that starts and tracks MR Tasks
@@ -1019,7 +1019,7 @@ public class TaskTracker 
     rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
     
     
-    TokenStorage ts = TokenCache.loadTokens(localJobTokenFile, fConf);
+    Credentials ts = TokenCache.loadTokens(localJobTokenFile, fConf);
     Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
     if (jt != null) { //could be null in the case of some unit tests
       getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Fri Mar  4 03:56:23 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -47,6 +48,7 @@ public class JobContext {
     "mapreduce.partitioner.class";
 
   protected final org.apache.hadoop.mapred.JobConf conf;
+  protected final Credentials credentials;
   private final JobID jobId;
 
   public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
@@ -67,6 +69,7 @@ public class JobContext {
   
   public JobContext(Configuration conf, JobID jobId) {
     this.conf = new org.apache.hadoop.mapred.JobConf(conf);
+    this.credentials = this.conf.getCredentials();
     this.jobId = jobId;
     try {
       this.ugi = UserGroupInformation.getCurrentUser();
@@ -84,6 +87,14 @@ public class JobContext {
   }
 
   /**
+   * Get credentials for the job.
+   * @return credentials for the job
+   */
+  public Credentials getCredentials() {
+    return credentials;
+  }
+
+  /**
    * Get the unique ID for the job.
    * @return the object with the job id
    */

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Fri Mar  4 03:56:23 2011
@@ -193,7 +193,8 @@ public abstract class FileInputFormat<K,
     }
     
     // get tokens for all the required FileSystems..
-    TokenCache.obtainTokensForNamenodes(dirs, job.getConfiguration());
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
+                                        job.getConfiguration());
 
     List<IOException> errors = new ArrayList<IOException>();
     

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java Fri Mar  4 03:56:23 2011
@@ -122,7 +122,9 @@ public abstract class FileOutputFormat<K
     }
     
     // get delegation token for outDir's file system
-    TokenCache.obtainTokensForNamenodes(new Path[] {outDir}, job.getConfiguration());
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(), 
+                                        new Path[] {outDir}, 
+                                        job.getConfiguration());
 
     if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
       throw new FileAlreadyExistsException("Output directory " + outDir + 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Mar  4 03:56:23 2011
@@ -37,7 +37,7 @@ import org.apache.hadoop.mapred.JobTrack
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -53,60 +53,35 @@ public class TokenCache {
   
   private static final Log LOG = LogFactory.getLog(TokenCache.class);
 
-  private static TokenStorage tokenStorage;
-  
   /**
    * auxiliary method to get user's secret keys..
    * @param alias
    * @return secret key from the storage
    */
-  public static byte[] getSecretKey(Text alias) {
-    if(tokenStorage == null)
+  public static byte[] getSecretKey(Credentials credentials, Text alias) {
+    if(credentials == null)
       return null;
-    return tokenStorage.getSecretKey(alias);
-  }
-  
-  /**
-   * auxiliary methods to store user'  s secret keys
-   * @param alias
-   * @param key
-   */
-  public static void addSecretKey(Text alias, byte[] key) {
-    getTokenStorage().addSecretKey(alias, key);
-  }
-  
-  /**
-   * auxiliary method to add a delegation token
-   */
-  public static void addDelegationToken(
-      String namenode, Token<? extends TokenIdentifier> t) {
-    getTokenStorage().addToken(new Text(namenode), t);
+    return credentials.getSecretKey(alias);
   }
 
   /**
-   * auxiliary method 
-   * @return all the available tokens
-   */
-  public static Collection<Token<? extends TokenIdentifier>> getAllTokens() {
-    return getTokenStorage().getAllTokens();
-  }
-  
-  /**
    * Convenience method to obtain delegation tokens from namenodes 
    * corresponding to the paths passed.
    * @param ps array of paths
    * @param conf configuration
    * @throws IOException
    */
-  public static void obtainTokensForNamenodes(Path [] ps, Configuration conf) 
+  public static void obtainTokensForNamenodes(Credentials credentials,
+                                              Path [] ps, Configuration conf) 
   throws IOException {
     if (!UserGroupInformation.isSecurityEnabled()) {
       return;
     }
-    obtainTokensForNamenodesInternal(ps, conf);
+    obtainTokensForNamenodesInternal(credentials, ps, conf);
   }
 
-  static void obtainTokensForNamenodesInternal(Path [] ps, Configuration conf)
+  static void obtainTokensForNamenodesInternal(Credentials credentials,
+                                               Path [] ps, Configuration conf)
   throws IOException {
     // get jobtracker principal id (for the renewer)
     Text jtCreds = new Text(conf.get(JobTracker.JT_USER_NAME, ""));
@@ -120,7 +95,7 @@ public class TokenCache {
 
         // see if we already have the token
         Token<DelegationTokenIdentifier> token = 
-          TokenCache.getDelegationToken(fs_addr); 
+          TokenCache.getDelegationToken(credentials, fs_addr); 
         if(token != null) {
           LOG.debug("DT for " + token.getService()  + " is already present");
           continue;
@@ -131,7 +106,7 @@ public class TokenCache {
           throw new IOException("Token from " + fs_addr + " is null");
 
         token.setService(new Text(fs_addr));
-        TokenCache.addDelegationToken(fs_addr, token);
+        credentials.addToken(new Text(fs_addr), token);
         LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr + 
             ";t.service="+token.getService());
       }
@@ -159,64 +134,24 @@ public class TokenCache {
   @SuppressWarnings("unchecked")
   //@InterfaceAudience.Private
   public static Token<DelegationTokenIdentifier> 
-  getDelegationToken(String namenode) {
-    return (Token<DelegationTokenIdentifier>)getTokenStorage().
-    getToken(new Text(namenode));
+  getDelegationToken(Credentials credentials, String namenode) {
+    return (Token<DelegationTokenIdentifier>)
+        credentials.getToken(new Text(namenode));
   }
 
   /**
-   * @return TokenStore object
-   */
-  //@InterfaceAudience.Private
-  public static TokenStorage getTokenStorage() {
-    if(tokenStorage==null)
-      tokenStorage = new TokenStorage();
-
-    return tokenStorage;
-  }
-
-  /**
-   * sets TokenStorage
-   * @param ts
-   */
-  //@InterfaceAudience.Private
-  public static void setTokenStorage(TokenStorage ts) {
-    if(tokenStorage != null)
-      LOG.warn("Overwriting existing token storage with # keys=" + 
-          tokenStorage.numberOfSecretKeys());
-    tokenStorage = ts;
-  }
-  
-  /**
-   * load token storage and stores it
-   * @param conf
-   * @return Loaded TokenStorage object
-   * @throws IOException
-   */
-  //@InterfaceAudience.Private
-  public static TokenStorage loadTaskTokenStorage(String fileName, JobConf conf)
-  throws IOException {
-    if(tokenStorage != null)
-      return tokenStorage;
-    
-    tokenStorage = loadTokens(fileName, conf);
-    
-    return tokenStorage;
-  }
-  
-  /**
    * load job token from a file
    * @param conf
    * @throws IOException
    */
   //@InterfaceAudience.Private
-  public static TokenStorage loadTokens(String jobTokenFile, JobConf conf) 
+  public static Credentials loadTokens(String jobTokenFile, JobConf conf) 
   throws IOException {
     Path localJobTokenFile = new Path (jobTokenFile);
     FileSystem localFS = FileSystem.getLocal(conf);
     FSDataInputStream in = localFS.open(localJobTokenFile);
     
-    TokenStorage ts = new TokenStorage();
+    Credentials ts = new Credentials();
     ts.readFields(in);
 
     if(LOG.isDebugEnabled()) {
@@ -233,8 +168,8 @@ public class TokenCache {
    */
   //@InterfaceAudience.Private
   public static void setJobToken(Token<? extends TokenIdentifier> t, 
-      TokenStorage ts) {
-    ts.addToken(JOB_TOKEN, t);
+      Credentials credentials) {
+    credentials.addToken(JOB_TOKEN, t);
   }
   /**
    * 
@@ -242,8 +177,8 @@ public class TokenCache {
    */
   //@InterfaceAudience.Private
   @SuppressWarnings("unchecked")
-  public static Token<JobTokenIdentifier> getJobToken(TokenStorage ts) {
-    return (Token<JobTokenIdentifier>) ts.getToken(JOB_TOKEN);
+  public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
+    return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
   }
 
   /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java Fri Mar  4 03:56:23 2011
@@ -38,7 +38,7 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -110,7 +110,7 @@ public class DelegationTokenRenewal {
   
   @SuppressWarnings("unchecked")
   public static synchronized void registerDelegationTokensForRenewal(
-      JobID jobId, TokenStorage ts, Configuration conf) {
+      JobID jobId, Credentials ts, Configuration conf) {
     if(ts==null)
       return; //nothing to add
     

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Mar  4 03:56:23 2011
@@ -43,13 +43,14 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -57,6 +58,7 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.ToolRunner;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -65,7 +67,27 @@ public class TestTokenCache {
   private static final int NUM_OF_KEYS = 10;
 
   // my sleep class - adds check for tokenCache
-  static class MySleepJob extends SleepJob {
+  static class MySleepJob extends SleepJob implements JobConfigurable {
+    Credentials ts;
+    
+    public void configure(JobConf job) {
+        //Credentials in the job will not have delegation tokens
+        //because security is disabled. Fetch delegation tokens
+        //and populate the credential in the job.
+    	try {
+          ts = job.getCredentials();
+          Path p1 = new Path("file1");
+          p1 = p1.getFileSystem(job).makeQualified(p1);
+          Credentials cred = new Credentials();
+          TokenCache.obtainTokensForNamenodesInternal(cred, new Path [] {p1}, job);
+          for (Token<? extends TokenIdentifier> t: cred.getAllTokens()) {
+            ts.addToken(new Text("Hdfs"), t);
+          }
+    	} catch (IOException e) {
+    		Assert.fail("Exception "+e);
+    	}
+    }
+    
     /**
      * attempts to access tokenCache as from client
      */
@@ -74,23 +96,12 @@ public class TestTokenCache {
         OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
         throws IOException {
       // get token storage and a key
-      TokenStorage ts = TokenCache.getTokenStorage();
-      byte[] key1 = TokenCache.getSecretKey(new Text("alias1"));
-      Collection<Token<? extends TokenIdentifier>> dts = TokenCache.getAllTokens();
+      byte[] key1 = ts.getSecretKey(new Text("alias1"));
+      Collection<Token<? extends TokenIdentifier>> dts = ts.getAllTokens();
       int dts_size = 0;
       if(dts != null)
         dts_size = dts.size();
 
-      System.out.println("inside MAP: ts==NULL?=" + (ts==null) + 
-          "; #keys = " + (ts==null? 0:ts.numberOfSecretKeys()) + 
-          ";jobToken = " +  (ts==null? "n/a":TokenCache.getJobToken(ts)) +
-          "; alias1 key=" + new String(key1) + 
-          "; dts size= " + dts_size);
-    
-      for(Token<? extends TokenIdentifier> t : dts) {
-        System.out.println(t.getKind() + "=" + StringUtils.byteToHexString(t.getPassword()));
-      }
-
       if(dts.size() != 2) { // one job token and one delegation token
         throw new RuntimeException("tokens are not available"); // fail the test
       }
@@ -143,11 +154,7 @@ public class TestTokenCache {
     
     p1 = new Path("file1");
     p2 = new Path("file2");
-    
     p1 = fs.makeQualified(p1);
-    // do not qualify p2
-    TokenCache.setTokenStorage(new TokenStorage());
-    TokenCache.obtainTokensForNamenodesInternal(new Path [] {p1, p2}, jConf);
   }
 
   @AfterClass
@@ -176,7 +183,6 @@ public class TestTokenCache {
       throw new IOException(e);
     }
     
-    System.out.println("writing secret keys into " + tokenFileName);
     try {
       File p  = new File(tokenFileName.getParent().toString());
       p.mkdirs();
@@ -193,8 +199,6 @@ public class TestTokenCache {
     Map<String, String> map;
     map = mapper.readValue(new File(tokenFileName.toString()), Map.class);
     assertEquals("didn't read JSON correctly", map.size(), NUM_OF_KEYS);
-    
-    System.out.println("file " + tokenFileName + " verified; size="+ map.size());
   }
   
   /**
@@ -203,9 +207,6 @@ public class TestTokenCache {
    */
   @Test
   public void testTokenCache() throws IOException {
-    
-    System.out.println("running dist job");
-    
     // make sure JT starts
     jConf = mrCluster.createJobConf();
     
@@ -241,12 +242,10 @@ public class TestTokenCache {
    */
   @Test
   public void testLocalJobTokenCache() throws NoSuchAlgorithmException, IOException {
-    
-    System.out.println("running local job");
     // this is local job
     String[] args = {"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"}; 
     jConf.set("tokenCacheFile", tokenFileName.toString());
-    
+
     int res = -1;
     try {
       res = ToolRunner.run(jConf, new MySleepJob(), args);
@@ -262,21 +261,23 @@ public class TestTokenCache {
   public void testGetTokensForNamenodes() throws IOException {
     FileSystem fs = dfsCluster.getFileSystem();
 
+    Credentials credentials = new Credentials();
+    TokenCache.obtainTokensForNamenodesInternal(credentials, new Path [] {p1, p2},
+                                        jConf);
     // this token is keyed by hostname:port key.
     String fs_addr = TokenCache.buildDTServiceName(p1.toUri()); 
-    Token<DelegationTokenIdentifier> nnt = TokenCache.getDelegationToken(fs_addr);
-    System.out.println("dt for " + p1 + "(" + fs_addr + ")" + " = " +  nnt);
+    Token<DelegationTokenIdentifier> nnt =
+      TokenCache.getDelegationToken(credentials, fs_addr);
 
     assertNotNull("Token for nn is null", nnt);
 
     // verify the size
-    Collection<Token<? extends TokenIdentifier>> tns = TokenCache.getAllTokens();
+    Collection<Token<? extends TokenIdentifier>> tns =
+      credentials.getAllTokens();
     assertEquals("number of tokens is not 1", 1, tns.size());
 
     boolean found = false;
     for(Token<? extends TokenIdentifier> t: tns) {
-      System.out.println("kind="+t.getKind() + ";servic=" + t.getService() + ";str=" + t.toString());
-
       if(t.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) &&
           t.getService().equals(new Text(fs_addr))) {
         found = true;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java Fri Mar  4 03:56:23 2011
@@ -38,7 +38,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.StringUtils;
@@ -238,7 +238,7 @@ public class TestDelegationTokenRenewal 
     String nn2 = DelegationTokenRenewal.SCHEME + "://host2:0";
     String nn3 = DelegationTokenRenewal.SCHEME + "://host3:0";
     
-    TokenStorage ts = new TokenStorage();
+    Credentials ts = new Credentials();
     
     // register the token for renewal
     ts.addToken(new Text(nn1), token1);
@@ -273,7 +273,7 @@ public class TestDelegationTokenRenewal 
     // add another token ( that expires in 2 secs). Then remove it, before
     // time is up.
     // Wait for 3 secs , and make sure no renews were called
-    ts = new TokenStorage();
+    ts = new Credentials();
     MyToken token4 = dfs.getDelegationToken(new Text("user4"));
     
     //to cause this one to be set for renew in 2 secs

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestJobCredentials.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestJobCredentials.java?rev=1077251&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestJobCredentials.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestJobCredentials.java Fri Mar  4 03:56:23 2011
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.security;
+
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.security.Key;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+import javax.crypto.KeyGenerator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestJobCredentials {
+  private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA1";
+  private static final File tmpDir =
+    new File(System.getProperty("test.build.data", "/tmp"), "mapred");  
+    
+  @Before
+  public void setUp() {
+    tmpDir.mkdir();
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test 
+  public <T extends TokenIdentifier> void testReadWriteStorage() 
+  throws IOException, NoSuchAlgorithmException{
+    // create tokenStorage Object
+    Credentials ts = new Credentials();
+    
+    Token<T> token1 = new Token();
+    Token<T> token2 = new Token();
+    Text service1 = new Text("service1");
+    Text service2 = new Text("service2");
+    Collection<Text> services = new ArrayList<Text>();
+    
+    services.add(service1);
+    services.add(service2);
+    
+    token1.setService(service1);
+    token2.setService(service2);
+    ts.addToken(new Text("sometoken1"), token1);
+    ts.addToken(new Text("sometoken2"), token2);
+    
+    // create keys and put it in
+    final KeyGenerator kg = KeyGenerator.getInstance(DEFAULT_HMAC_ALGORITHM);
+    String alias = "alias";
+    Map<Text, byte[]> m = new HashMap<Text, byte[]>(10);
+    for(int i=0; i<10; i++) {
+      Key key = kg.generateKey();
+      m.put(new Text(alias+i), key.getEncoded());
+      ts.addSecretKey(new Text(alias+i), key.getEncoded());
+    }
+   
+    // create file to store
+    File tmpFileName = new File(tmpDir, "tokenStorageTest");
+    DataOutputStream dos = 
+      new DataOutputStream(new FileOutputStream(tmpFileName));
+    ts.write(dos);
+    dos.close();
+    
+    // open and read it back
+    DataInputStream dis = 
+      new DataInputStream(new FileInputStream(tmpFileName));    
+    ts = new Credentials();
+    ts.readFields(dis);
+    dis.close();
+    
+    // get the tokens and compare the services
+    Collection<Token<? extends TokenIdentifier>> list = ts.getAllTokens();
+    assertEquals("getAllTokens should return collection of size 2", 
+        list.size(), 2);
+    boolean foundFirst = false;
+    boolean foundSecond = false;
+    for (Token<? extends TokenIdentifier> token : list) {
+      if (token.getService().equals(service1)) {
+        foundFirst = true;
+      }
+      if (token.getService().equals(service2)) {
+        foundSecond = true;
+      }
+    }
+    assertTrue("Tokens for services service1 and service2 must be present", 
+        foundFirst && foundSecond);
+    // compare secret keys
+    int mapLen = m.size();
+    assertEquals("wrong number of keys in the Storage", 
+        mapLen, ts.numberOfSecretKeys());
+    for(Text a : m.keySet()) {
+      byte [] kTS = ts.getSecretKey(a);
+      byte [] kLocal = m.get(a);
+      assertTrue("keys don't match for " + a, 
+          WritableComparator.compareBytes(kTS, 0, kTS.length, kLocal,
+              0, kLocal.length)==0);
+    }
+  }
+ }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java Fri Mar  4 03:56:23 2011
@@ -32,7 +32,7 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -80,7 +80,7 @@ public class TestDelegationTokenFetcher 
     new DelegationTokenFetcher(dfs, out, ugi).go();
     
     // now read the data back in and verify correct values
-    TokenStorage ts = new TokenStorage();
+    Credentials ts = new Credentials();
     DataInputStream dis = 
       new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
     ts.readFields(dis);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java?rev=1077251&r1=1077250&r2=1077251&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java Fri Mar  4 03:56:23 2011
@@ -621,14 +621,15 @@ public class DistCp implements Tool {
   }
 
   /** Sanity check for srcPath */
-  private static void checkSrcPath(Configuration conf, List<Path> srcPaths
-      ) throws IOException {
+  private static void checkSrcPath(Configuration conf, 
+                                   List<Path> srcPaths, JobConf jobConf)
+  throws IOException {
     List<IOException> rslt = new ArrayList<IOException>();
     
     // get tokens for all the required FileSystems..
     Path[] ps = new Path[srcPaths.size()];
     ps = srcPaths.toArray(ps);
-    TokenCache.obtainTokensForNamenodes(ps, conf);
+    TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, conf);
 
     for (Path p : srcPaths) {
       FileSystem fs = p.getFileSystem(conf);
@@ -649,9 +650,10 @@ public class DistCp implements Tool {
       ) throws IOException {
     LOG.info("srcPaths=" + args.srcs);
     LOG.info("destPath=" + args.dst);
-    checkSrcPath(conf, args.srcs);
 
     JobConf job = createJobConf(conf);
+    
+    checkSrcPath(conf, args.srcs, job);
     if (args.preservedAttributes != null) {
       job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes);
     }
@@ -1027,7 +1029,8 @@ public class DistCp implements Tool {
     FileSystem dstfs = args.dst.getFileSystem(conf);
     
     // get tokens for all the required FileSystems..
-    TokenCache.obtainTokensForNamenodes(new Path[] {args.dst}, conf);
+    TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), 
+                                        new Path[] {args.dst}, conf);
     
     boolean dstExists = dstfs.exists(args.dst);
     boolean dstIsDir = false;