You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:40:10 UTC

svn commit: r1077097 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/security/ mapred/org/apache/hadoop/mapreduce/security/token/ test/org/apache/hadoop/mapred/

Author: omalley
Date: Fri Mar  4 03:40:10 2011
New Revision: 1077097

URL: http://svn.apache.org/viewvc?rev=1077097&view=rev
Log:
commit a9f6714543b5b7623b8da159e8f2942b3f38dc40
Author: Jitendra Nath Pandey <ji...@yahoo-inc.com>
Date:   Thu Jan 7 11:15:53 2010 -0800

    MAPREDUCE-1250 from https://issues.apache.org/jira/secure/attachment/12429629/MR-1250-0_20.2.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1250. Refactor job token to use a common token interface.
    +    (Jitendra Nath Pandey)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java
Removed:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/JobTokens.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestShuffleJobToken.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar  4 03:40:10 2011
@@ -34,10 +34,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.security.token.Token;
 import org.apache.log4j.LogManager;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
@@ -69,7 +71,7 @@ class Child {
     // file name is passed thru env
     String jobTokenFile = System.getenv().get("JOB_TOKEN_FILE");
     FileSystem localFs = FileSystem.getLocal(defaultConf);
-    JobTokens jt = loadJobTokens(jobTokenFile, localFs);
+    Token<JobTokenIdentifier> jt = loadJobToken(jobTokenFile, localFs);
     LOG.debug("Child: got jobTokenfile=" + jobTokenFile);
 
     TaskUmbilicalProtocol umbilical =
@@ -150,7 +152,7 @@ class Child {
         JobConf job = new JobConf(task.getJobFile());
 
         // set the jobTokenFile into task
-        task.setJobTokens(jt);
+        task.setJobTokenSecret(JobTokenSecretManager.createSecretKey(jt.getPassword()));
 
         //setupWorkDir actually sets up the symlinks for the distributed
         //cache. After a task exits we wipe the workdir clean, and hence
@@ -219,16 +221,16 @@ class Child {
   }
   
   /**
-   * load secret keys from a file
+   * load job token from a file
    * @param jobTokenFile
    * @param conf
    * @throws IOException
    */
-  private static JobTokens loadJobTokens(String jobTokenFile, FileSystem localFS) 
+  private static Token<JobTokenIdentifier> loadJobToken(String jobTokenFile, FileSystem localFS) 
   throws IOException {
     Path localJobTokenFile = new Path (jobTokenFile);
     FSDataInputStream in = localFS.open(localJobTokenFile);
-    JobTokens jt = new JobTokens();
+    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
     jt.readFields(in);
         
     LOG.debug("Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath());

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 03:40:10 2011
@@ -39,9 +39,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -49,6 +50,7 @@ import org.apache.hadoop.metrics.Metrics
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -503,7 +505,7 @@ class JobInProgress {
     //
     // generate security keys needed by Tasks
     //
-    generateJobTokens(fs, jobtracker.getSystemDirectoryForJob(jobId));
+    generateJobToken(fs);
     
     //
     // read input splits and create a map per a split
@@ -3077,30 +3079,25 @@ class JobInProgress {
       );
     }
   }
-  
+
   /**
-   * generate keys and save it into the file
-   * @param jobDir
+   * generate job token and save it into the file
    * @throws IOException
    */
-  private void generateJobTokens(FileSystem fs, Path jobDir) throws IOException{
-    Path keysFile = new Path(jobDir, JobTokens.JOB_TOKEN_FILENAME);
+  private void generateJobToken(FileSystem fs) throws IOException {
+    Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
+    Path keysFile = new Path(jobDir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
+    // we need to create this file using the jobtracker's filesystem
     FSDataOutputStream os = fs.create(keysFile);
-    //create JobTokens file and add key to it
-    JobTokens jt = new JobTokens();
-    byte [] key;
-    try {
-      // new key
-      key = SecureShuffleUtils.getNewEncodedKey();
-    } catch (java.security.GeneralSecurityException e) {
-      throw new IOException(e);
-    }
-    // remember the key 
-    jt.setShuffleJobToken(key);
-    // other keys..
-    jt.write(os);
+    //create JobToken file and write token to it
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
+        .toString()));
+    Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(identifier,
+        jobtracker.getJobTokenSecretManager());
+    token.setService(identifier.getJobId());
+    token.write(os);
     os.close();
-    LOG.debug("jobTokens generated and stored in "+ keysFile.toUri().getPath());
+    LOG.debug("jobToken generated and stored in "+ keysFile.toUri().getPath());
   }
-
+  
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:40:10 2011
@@ -100,6 +100,7 @@ import org.apache.hadoop.util.VersionInf
 
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /*******************************************************
@@ -171,6 +172,13 @@ public class JobTracker implements MRCon
   final static FsPermission SYSTEM_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0700); // rwx------
 
+  private final JobTokenSecretManager jobTokenSecretManager
+    = new JobTokenSecretManager();
+
+  JobTokenSecretManager getJobTokenSecretManager() {
+    return jobTokenSecretManager;
+  }
+
   /**
    * A client tried to submit a job before the Job Tracker was ready.
    */

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar  4 03:40:10 2011
@@ -48,6 +48,8 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.crypto.SecretKey;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -85,10 +87,7 @@ import org.apache.hadoop.util.Progressab
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
-import org.apache.hadoop.mapreduce.security.JobTokens;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
-import org.apache.commons.codec.binary.Base64;
-import java.security.GeneralSecurityException;
 
 /** A Reduce task. */
 class ReduceTask extends Task {
@@ -1152,14 +1151,14 @@ class ReduceTask extends Task {
       private CompressionCodec codec = null;
       private Decompressor decompressor = null;
       
-      private final byte[] shuffleJobToken;
+      private final SecretKey jobTokenSecret;
       
-      public MapOutputCopier(JobConf job, Reporter reporter, byte [] shuffleJobToken) {
+      public MapOutputCopier(JobConf job, Reporter reporter, SecretKey jobTokenSecret) {
         setName("MapOutputCopier " + reduceTask.getTaskID() + "." + id);
         LOG.debug(getName() + " created");
         this.reporter = reporter;
 
-        this.shuffleJobToken = shuffleJobToken;       	
+        this.jobTokenSecret = jobTokenSecret;
  
         shuffleConnectionTimeout =
           job.getInt("mapreduce.reduce.shuffle.connect.timeout", STALLED_COPY_TIMEOUT);
@@ -1389,9 +1388,8 @@ class ReduceTask extends Task {
         URLConnection connection = url.openConnection();
 
         // generate hash of the url
-        SecureShuffleUtils ssutil = new SecureShuffleUtils(shuffleJobToken);
         String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
-        String encHash = ssutil.hashFromString(msgToEncode);
+        String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
 
         // put url hash into http header
         connection.addRequestProperty(
@@ -1407,7 +1405,7 @@ class ReduceTask extends Task {
         }       
         LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
         // verify that replyHash is HMac of encHash
-        ssutil.verifyReply(replyHash, encHash);
+        SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
         LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
  
         // Validate header from map output
@@ -1893,8 +1891,8 @@ class ReduceTask extends Task {
       
       // start all the copying threads
       for (int i=0; i < numCopiers; i++) {
-        MapOutputCopier copier = new MapOutputCopier(conf, reporter,
-            reduceTask.getJobTokens().getShuffleJobToken());
+        MapOutputCopier copier = new MapOutputCopier(conf, reporter, 
+            reduceTask.getJobTokenSecret());
         copiers.add(copier);
         copier.start();
       }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar  4 03:40:10 2011
@@ -30,6 +30,8 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.crypto.SecretKey;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
@@ -47,8 +49,6 @@ import org.apache.hadoop.io.serializer.D
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.security.JobTokens;
-import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
@@ -151,6 +151,7 @@ abstract public class Task implements Wr
   protected TaskUmbilicalProtocol umbilical;
   private int numSlotsRequired;
   protected JobTokens jobTokens=null; // storage of the secret keys
+  protected SecretKey tokenSecret;
 
   ////////////////////////////////////////////
   // Constructors
@@ -204,19 +205,19 @@ abstract public class Task implements Wr
   }
 
   /**
-   * set JobToken storage 
-   * @param jt
+   * Set the job token secret 
+   * @param tokenSecret the secret
    */
-  public void setJobTokens(JobTokens jt) {
-    this.jobTokens = jt;
+  public void setJobTokenSecret(SecretKey tokenSecret) {
+    this.tokenSecret = tokenSecret;
   }
 
   /**
-   * get JobToken storage
-   * @return storage object
+   * Get the job token secret
+   * @return the token secret
    */
-  public JobTokens getJobTokens() {
-    return this.jobTokens;
+  public SecretKey getJobTokenSecret() {
+    return this.tokenSecret;
   }
 
   

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 03:40:10 2011
@@ -43,6 +43,7 @@ import java.util.concurrent.BlockingQueu
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Pattern;
 
+import javax.crypto.SecretKey;
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -74,8 +75,9 @@ import org.apache.hadoop.mapred.TaskStat
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.JobTokens;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -90,6 +92,7 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
@@ -189,6 +192,13 @@ public class TaskTracker 
    */
   Map<TaskAttemptID, TaskInProgress> runningTasks = null;
   Map<JobID, RunningJob> runningJobs = new TreeMap<JobID, RunningJob>();
+  private final JobTokenSecretManager jobTokenSecretManager
+    = new JobTokenSecretManager();
+
+  JobTokenSecretManager getJobTokenSecretManager() {
+    return jobTokenSecretManager;
+  }
+
   volatile int mapTotal = 0;
   volatile int reduceTotal = 0;
   boolean justStarted = true;
@@ -912,9 +922,9 @@ public class TaskTracker 
         localizeJobTokenFile(t.getUser(), jobId, localJobConf);       
         FSDataInputStream in = localFs.open(new Path(
             rjob.jobConf.get(JobContext.JOB_TOKEN_FILE)));
-        JobTokens jt = new JobTokens();
+        Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
         jt.readFields(in); 
-        rjob.jobTokens = jt; // store JobToken object per job
+        getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
  
         rjob.localized = true;
         taskController.initializeJob(jobId);
@@ -1544,7 +1554,7 @@ public class TaskTracker 
     synchronized(runningJobs) {
       runningJobs.remove(jobId);
     }
-    
+    getJobTokenSecretManager().removeTokenForJob(jobId.toString());  
   }      
     
     
@@ -2911,7 +2921,6 @@ public class TaskTracker 
     boolean localized;
     boolean keepJobFiles;
     FetchStatus f;
-    JobTokens jobTokens;
     RunningJob(JobID jobid) {
       this.jobid = jobid;
       localized = false;
@@ -3221,14 +3230,8 @@ public class TaskTracker 
     private void verifyRequest(HttpServletRequest request, 
         HttpServletResponse response, TaskTracker tracker, String jobId) 
     throws IOException {
-      JobTokens jt = null;
-      synchronized (tracker.runningJobs) {
-        RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
-        if (rjob == null) {
-          throw new IOException("Unknown job " + jobId + "!!");
-        }
-        jt = rjob.jobTokens;
-      }
+      SecretKey tokenSecret = tracker.getJobTokenSecretManager()
+          .retrieveTokenSecret(jobId);
       // string to encrypt
       String enc_str = SecureShuffleUtils.buildMsgFrom(request);
       
@@ -3242,17 +3245,16 @@ public class TaskTracker 
       LOG.debug("verifying request. enc_str="+enc_str+"; hash=..."+
           urlHashStr.substring(len-len/2, len-1)); // half of the hash for debug
 
-      SecureShuffleUtils ssutil = new SecureShuffleUtils(jt.getShuffleJobToken());
       // verify - throws exception
       try {
-        ssutil.verifyReply(urlHashStr, enc_str);
+        SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);
       } catch (IOException ioe) {
         response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
         throw ioe;
       }
       
       // verification passed - encode the reply
-      String reply = ssutil.generateHash(urlHashStr.getBytes());
+      String reply = SecureShuffleUtils.generateHash(urlHashStr.getBytes(), tokenSecret);
       response.addHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
       
       len = reply.length();
@@ -3499,7 +3501,7 @@ public class TaskTracker 
         throws IOException {
       // check if the tokenJob file is there..
       Path skPath = new Path(systemDirectory, 
-          jobId.toString()+"/"+JobTokens.JOB_TOKEN_FILENAME);
+          jobId.toString()+"/"+SecureShuffleUtils.JOB_TOKEN_FILENAME);
       
       FileStatus status = null;
       long jobTokenSize = -1;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java Fri Mar  4 03:40:10 2011
@@ -22,15 +22,12 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.URL;
-import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
 
-import javax.crypto.KeyGenerator;
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.SecretKey;
 import javax.servlet.http.HttpServletRequest;
 
 import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.record.Utils;
 
 /**
@@ -41,62 +38,17 @@ import org.apache.hadoop.record.Utils;
 public class SecureShuffleUtils {
   public static final String HTTP_HEADER_URL_HASH = "UrlHash";
   public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
-  public static KeyGenerator kg = null;
-  public static String DEFAULT_ALG="HmacSHA1";
-  
-  private SecretKeySpec secretKey;
-  private Mac mac;
-  
   /**
-   * static generate keys
-   * @return new encoded key
-   * @throws NoSuchAlgorithmException
+   * file name used on HDFS for generated job token
    */
-  public static byte[] getNewEncodedKey() throws NoSuchAlgorithmException{
-    SecretKeySpec key = generateKey(DEFAULT_ALG);
-    return key.getEncoded();
-  }
-  
-  private static SecretKeySpec generateKey(String alg) throws NoSuchAlgorithmException {
-    if(kg==null) {
-      kg = KeyGenerator.getInstance(alg);
-    }
-    return (SecretKeySpec) kg.generateKey();
-  }
-
-  /**
-   * Create a util object with alg and key
-   * @param sKeyEncoded
-   * @throws NoSuchAlgorithmException
-   * @throws InvalidKeyException
-   */
-  public SecureShuffleUtils(byte [] sKeyEncoded) 
-  throws  IOException{
-    secretKey = new SecretKeySpec(sKeyEncoded, SecureShuffleUtils.DEFAULT_ALG);
-    try {
-      mac = Mac.getInstance(DEFAULT_ALG);
-      mac.init(secretKey);
-    } catch (NoSuchAlgorithmException nae) {
-      throw new IOException(nae);
-    } catch( InvalidKeyException ie) {
-      throw new IOException(ie);
-    }
-  }
-  
-  /** 
-   * get key as byte[]
-   * @return encoded key
-   */
-  public byte [] getEncodedKey() {
-    return secretKey.getEncoded();
-  }
+  public static final String JOB_TOKEN_FILENAME = "jobToken";
   
   /**
    * Base64 encoded hash of msg
    * @param msg
    */
-  public String generateHash(byte[] msg) {
-    return new String(Base64.encodeBase64(generateByteHash(msg)));
+  public static String generateHash(byte[] msg, SecretKey key) {
+    return new String(Base64.encodeBase64(generateByteHash(msg, key)));
   }
   
   /**
@@ -104,8 +56,8 @@ public class SecureShuffleUtils {
    * @param msg
    * @return
    */
-  private byte[] generateByteHash(byte[] msg) {
-    return mac.doFinal(msg);
+  private static byte[] generateByteHash(byte[] msg, SecretKey key) {
+    return JobTokenSecretManager.computeHash(msg, key);
   }
   
   /**
@@ -113,20 +65,21 @@ public class SecureShuffleUtils {
    * @param newHash
    * @return true if is the same
    */
-  private boolean verifyHash(byte[] hash, byte[] msg) {
-    byte[] msg_hash = generateByteHash(msg);
+  private static boolean verifyHash(byte[] hash, byte[] msg, SecretKey key) {
+    byte[] msg_hash = generateByteHash(msg, key);
     return Utils.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length) == 0;
   }
   
   /**
    * Aux util to calculate hash of a String
    * @param enc_str
+   * @param key
    * @return Base64 encodedHash
    * @throws IOException
    */
-  public String hashFromString(String enc_str) 
+  public static String hashFromString(String enc_str, SecretKey key) 
   throws IOException {
-    return generateHash(enc_str.getBytes()); 
+    return generateHash(enc_str.getBytes(), key); 
   }
   
   /**
@@ -135,11 +88,11 @@ public class SecureShuffleUtils {
    * @param msg
    * @throws IOException if not the same
    */
-  public void verifyReply(String base64Hash, String msg)
+  public static void verifyReply(String base64Hash, String msg, SecretKey key)
   throws IOException {
     byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
     
-    boolean res = verifyHash(hash, msg.getBytes());
+    boolean res = verifyHash(hash, msg.getBytes(), key);
     
     if(res != true) {
       throw new IOException("Verification of the hashReply failed");

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java?rev=1077097&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java Fri Mar  4 03:40:10 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * The token identifier for job token
+ */
+public class JobTokenIdentifier extends TokenIdentifier {
+  private Text jobid;
+  final static Text KIND_NAME = new Text("mapreduce.job");
+  
+  /**
+   * Create a job token identifier from a jobid
+   * @param jobid the jobid to use
+   */
+  public JobTokenIdentifier(Text jobid) {
+    this.jobid = jobid;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public Text getKind() {
+    return KIND_NAME;
+  }
+  
+  /**
+   * Get the jobid
+   * @return the jobid
+   */
+  public Text getJobId() {
+    return jobid;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    jobid.readFields(in);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    jobid.write(out);
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java?rev=1077097&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java Fri Mar  4 03:40:10 2011
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * SecretManager for job token. It can be used to cache generated job tokens.
+ */
+public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier> {
+  private final SecretKey masterKey;
+  private final Map<String, SecretKey> currentJobTokens;
+
+  /**
+   * Convert the byte[] to a secret key
+   * @param key the byte[] to create the secret key from
+   * @return the secret key
+   */
+  public static SecretKey createSecretKey(byte[] key) {
+    return SecretManager.createSecretKey(key);
+  }
+  
+  /**
+   * Compute the HMAC hash of the message using the key
+   * @param msg the message to hash
+   * @param key the key to use
+   * @return the computed hash
+   */
+  public static byte[] computeHash(byte[] msg, SecretKey key) {
+    return createPassword(msg, key);
+  }
+  
+  /**
+   * Default constructor
+   */
+  public JobTokenSecretManager() {
+    this.masterKey = generateSecret();
+    this.currentJobTokens = new TreeMap<String, SecretKey>();
+  }
+  
+  /**
+   * Create a new password/secret for the given job token identifier.
+   * @param identifier the job token identifier
+   * @return token password/secret
+   */
+  @Override
+  public byte[] createPassword(JobTokenIdentifier identifier) {
+    byte[] result = createPassword(identifier.getBytes(), masterKey);
+    return result;
+  }
+
+  /**
+   * Add the job token of a job to cache
+   * @param jobId the job that owns the token
+   * @param token the job token
+   */
+  public void addTokenForJob(String jobId, Token<JobTokenIdentifier> token) {
+    SecretKey tokenSecret = createSecretKey(token.getPassword());
+    synchronized (currentJobTokens) {
+      currentJobTokens.put(jobId, tokenSecret);
+    }
+  }
+
+  /**
+   * Remove the cached job token of a job from cache
+   * @param jobId the job whose token is to be removed
+   */
+  public void removeTokenForJob(String jobId) {
+    synchronized (currentJobTokens) {
+      currentJobTokens.remove(jobId);
+    }
+  }
+  
+  /**
+   * Look up the token password/secret for the given jobId.
+   * @param jobId the jobId to look up
+   * @return token password/secret as SecretKey
+   * @throws InvalidToken
+   */
+  public SecretKey retrieveTokenSecret(String jobId) throws InvalidToken {
+    SecretKey tokenSecret = null;
+    synchronized (currentJobTokens) {
+      tokenSecret = currentJobTokens.get(jobId);
+    }
+    if (tokenSecret == null) {
+      throw new InvalidToken("Can't find job token for job " + jobId + " !!");
+    }
+    return tokenSecret;
+  }
+  
+  /**
+   * Look up the token password/secret for the given job token identifier.
+   * @param identifier the job token identifier to look up
+   * @return token password/secret as byte[]
+   * @throws InvalidToken
+   */
+  @Override
+  public byte[] retrievePassword(JobTokenIdentifier identifier)
+      throws InvalidToken {
+    return retrieveTokenSecret(identifier.getJobId().toString()).getEncoded();
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestShuffleJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestShuffleJobToken.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestShuffleJobToken.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestShuffleJobToken.java Fri Mar  4 03:40:10 2011
@@ -27,9 +27,14 @@ import java.net.URL;
 import java.net.URLConnection;
 import java.security.GeneralSecurityException;
 
+import javax.crypto.SecretKey;
+
 import org.apache.hadoop.http.HttpServer;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.token.Token;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,6 +45,7 @@ public class TestShuffleJobToken {
   private static URL baseUrl;
   private static File dir;
   private static final String JOB_ID = "job_20091117075357176_0001";
+  private static final String BAD_JOB_ID = "job_20091117075357176_0002";
   
   // create fake url
   private URL getMapOutputURL(String host)  throws MalformedURLException {
@@ -86,25 +92,26 @@ public class TestShuffleJobToken {
     URL url = getMapOutputURL(baseUrl.toString());
     String enc_str = SecureShuffleUtils.buildMsgFrom(url);
     URLConnection connectionGood = url.openConnection();
-
-    // create key 
-    byte [] key= SecureShuffleUtils.getNewEncodedKey();
     
-    // create fake TaskTracker - needed for keys storage
-    JobTokens jt = new JobTokens();
-    jt.setShuffleJobToken(key);
     TaskTracker tt  = new TaskTracker();
+    JobTokenSecretManager jtSecretManager = new JobTokenSecretManager();
+    // create fake TaskTracker - needed for keys storage
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(JOB_ID));
+    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(identifier,
+        jtSecretManager);
+    SecretKey tokenSecret = JobTokenSecretManager.createSecretKey(jt.getPassword());
     addJobToken(tt, JOB_ID, jt); // fake id
     server.setAttribute("task.tracker", tt);
 
     // encode the url
-    SecureShuffleUtils mac = new SecureShuffleUtils(key);
-    String urlHashGood = mac.generateHash(enc_str.getBytes()); // valid hash
+    String urlHashGood = SecureShuffleUtils.generateHash(enc_str.getBytes(), tokenSecret); // valid hash
     
     // another the key
-    byte [] badKey= SecureShuffleUtils.getNewEncodedKey();
-    mac = new SecureShuffleUtils(badKey);
-    String urlHashBad = mac.generateHash(enc_str.getBytes()); // invalid hash 
+    JobTokenIdentifier badIdentifier = new JobTokenIdentifier(new Text(BAD_JOB_ID));
+    Token<JobTokenIdentifier> badToken = new Token<JobTokenIdentifier>(badIdentifier,
+        jtSecretManager);
+    SecretKey badSecret = JobTokenSecretManager.createSecretKey(badToken.getPassword());
+    String urlHashBad = SecureShuffleUtils.generateHash(enc_str.getBytes(), badSecret); // invalid hash 
     
     // put url hash into http header
     connectionGood.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, urlHashGood);
@@ -135,13 +142,13 @@ public class TestShuffleJobToken {
     } 
   }
   /*Note that this method is there for a unit testcase (TestShuffleJobToken)*/
-  void addJobToken(TaskTracker tt, String jobIdStr, JobTokens jt) {
+  void addJobToken(TaskTracker tt, String jobIdStr, Token<JobTokenIdentifier> token) {
     JobID jobId = JobID.forName(jobIdStr);
     TaskTracker.RunningJob rJob = new TaskTracker.RunningJob(jobId);
-    rJob.jobTokens = jt;
     synchronized (tt.runningJobs) {
       tt.runningJobs.put(jobId, rJob);
     }
+    tt.getJobTokenSecretManager().addTokenForJob(jobIdStr, token);
   }
 
 }