You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/12/22 02:33:14 UTC

svn commit: r893055 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/security/ src/java/org/apache/hadoop/mapreduce/security/token/ src/java/org/apache/hadoop/mapreduce/task/reduce/ src/test/mapre...

Author: ddas
Date: Tue Dec 22 01:33:13 2009
New Revision: 893055

URL: http://svn.apache.org/viewvc?rev=893055&view=rev
Log:
MAPREDUCE-1250. Refactors the JobToken to use Common's Token interface. Contributed by Kan Zhang.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Dec 22 01:33:13 2009
@@ -87,6 +87,9 @@
     MAPREDUCE-181. Changes the job submission process to be secure.
     (Devaraj Das)
 
+    MAPREDUCE-1250. Refactors the JobToken to use Common's Token interface.
+    (Kan Zhang via ddas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Tue Dec 22 01:33:13 2009
@@ -34,10 +34,12 @@
 import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.security.token.Token;
 import org.apache.log4j.LogManager;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
@@ -73,7 +75,7 @@
     // file name is passed thru env
     String jobTokenFile = System.getenv().get("JOB_TOKEN_FILE");
     FileSystem localFs = FileSystem.getLocal(defaultConf);
-    JobTokens jt = loadJobTokens(jobTokenFile, localFs);
+    Token<JobTokenIdentifier> jt = loadJobToken(jobTokenFile, localFs);
     LOG.debug("Child: got jobTokenfile=" + jobTokenFile);
     
     TaskUmbilicalProtocol umbilical =
@@ -154,7 +156,7 @@
         JobConf job = new JobConf(task.getJobFile());
         
         // set the jobTokenFile into task
-        task.setJobTokens(jt);
+        task.setJobTokenSecret(JobTokenSecretManager.createSecretKey(jt.getPassword()));
         
         // setup the child's Configs.LOCAL_DIR. The child is now sandboxed and
         // can only see files down and under attemtdir only.
@@ -226,16 +228,16 @@
   }
   
   /**
-   * load secret keys from a file
+   * load job token from a file
    * @param jobTokenFile
    * @param conf
    * @throws IOException
    */
-  private static JobTokens loadJobTokens(String jobTokenFile, FileSystem localFS) 
+  private static Token<JobTokenIdentifier> loadJobToken(String jobTokenFile, FileSystem localFS) 
   throws IOException {
     Path localJobTokenFile = new Path (jobTokenFile);
     FSDataInputStream in = localFS.open(localJobTokenFile);
-    JobTokens jt = new JobTokens();
+    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
     jt.readFields(in);
         
     LOG.debug("Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath());

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Dec 22 01:33:13 2009
@@ -43,6 +43,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
@@ -62,7 +63,7 @@
 import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.split.JobSplit;
@@ -74,6 +75,7 @@
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -591,7 +593,7 @@
     //
     // generate security keys needed by Tasks
     //
-    generateJobTokens(jobtracker.getSystemDirectoryForJob(jobId));
+    generateJobToken();
     
     //
     // read input splits and create a map per a split
@@ -3521,29 +3523,23 @@
   }
   
   /**
-   * generate keys and save it into the file
-   * @param jobDir
+   * generate job token and save it into the file
    * @throws IOException
    */
-  private void generateJobTokens(Path jobDir) throws IOException{
-    Path keysFile = new Path(jobDir, JobTokens.JOB_TOKEN_FILENAME);
+  private void generateJobToken() throws IOException{
+    Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
+    Path keysFile = new Path(jobDir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
     // we need to create this file using the jobtracker's filesystem
     FSDataOutputStream os = jobtracker.getFileSystem().create(keysFile);
-    //create JobTokens file and add key to it
-    JobTokens jt = new JobTokens();
-    byte [] key;
-    try {
-      // new key
-      key = SecureShuffleUtils.getNewEncodedKey();
-    } catch (java.security.GeneralSecurityException e) {
-      throw new IOException(e);
-    }
-    // remember the key 
-    jt.setShuffleJobToken(key);
-    // other keys..
-    jt.write(os);
+    //create JobToken file and write token to it
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
+        .toString()));
+    Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(identifier,
+        jobtracker.getJobTokenSecretManager());
+    token.setService(identifier.getJobId());
+    token.write(os);
     os.close();
-    LOG.debug("jobTokens generated and stored in "+ keysFile.toUri().getPath());
+    LOG.debug("jobToken generated and stored in "+ keysFile.toUri().getPath());
   }
 
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Dec 22 01:33:13 2009
@@ -79,6 +79,7 @@
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
@@ -172,6 +173,13 @@
   static final Clock DEFAULT_CLOCK = new Clock();
 
   private final JobHistory jobHistory;
+  
+  private final JobTokenSecretManager jobTokenSecretManager 
+    = new JobTokenSecretManager();
+  
+  JobTokenSecretManager getJobTokenSecretManager() {
+    return jobTokenSecretManager;
+  }
 
   private MRAsyncDiskService asyncDiskService;
   

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Tue Dec 22 01:33:13 2009
@@ -28,6 +28,8 @@
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.crypto.SecretKey;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
@@ -50,8 +52,6 @@
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
 import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
-import org.apache.hadoop.mapreduce.security.JobTokens;
-import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
@@ -140,7 +140,7 @@
   protected final Counters.Counter mergedMapOutputsCounter;
   private int numSlotsRequired;
   protected TaskUmbilicalProtocol umbilical;
-  protected JobTokens jobTokens=null; // storage of the secret keys
+  protected SecretKey tokenSecret;
 
   ////////////////////////////////////////////
   // Constructors
@@ -199,19 +199,19 @@
   }
 
   /**
-   * set JobToken storage 
-   * @param jt
+   * Set the job token secret 
+   * @param tokenSecret the secret
    */
-  public void setJobTokens(JobTokens jt) {
-    this.jobTokens = jt;
+  public void setJobTokenSecret(SecretKey tokenSecret) {
+    this.tokenSecret = tokenSecret;
   }
 
   /**
-   * get JobToken storage
-   * @return storage object
+   * Get the job token secret
+   * @return the token secret
    */
-  public JobTokens getJobTokens() {
-    return this.jobTokens;
+  public SecretKey getJobTokenSecret() {
+    return this.tokenSecret;
   }
 
   

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Dec 22 01:33:13 2009
@@ -43,6 +43,7 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Pattern;
 
+import javax.crypto.SecretKey;
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -73,8 +74,9 @@
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
-import org.apache.hadoop.mapreduce.security.JobTokens;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@@ -95,6 +97,7 @@
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
@@ -193,6 +196,8 @@
    */
   Map<TaskAttemptID, TaskInProgress> runningTasks = null;
   Map<JobID, RunningJob> runningJobs = new TreeMap<JobID, RunningJob>();
+  private final JobTokenSecretManager jobTokenSecretManager 
+    = new JobTokenSecretManager();
 
   volatile int mapTotal = 0;
   volatile int reduceTotal = 0;
@@ -426,6 +431,10 @@
     }
   }
 
+  JobTokenSecretManager getJobTokenSecretManager() {
+    return jobTokenSecretManager;
+  }
+
   Localizer getLocalizer() {
     return localizer;
   }
@@ -896,10 +905,9 @@
                              localJobConf.getKeepFailedTaskFiles());
         FSDataInputStream in = localFs.open(new Path(
             rjob.jobConf.get(JobContext.JOB_TOKEN_FILE)));
-        JobTokens jt = new JobTokens();
+        Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
         jt.readFields(in); 
-        rjob.jobTokens = jt; // store JobToken object per job
-        
+        getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
         rjob.localized = true;
       }
     }
@@ -1598,6 +1606,7 @@
     synchronized(runningJobs) {
       runningJobs.remove(jobId);
     }
+    getJobTokenSecretManager().removeTokenForJob(jobId.toString());
   }
 
   /**
@@ -2940,7 +2949,6 @@
     boolean localized;
     boolean keepJobFiles;
     FetchStatus f;
-    JobTokens jobTokens;
     RunningJob(JobID jobid) {
       this.jobid = jobid;
       localized = false;
@@ -3306,14 +3314,8 @@
     private void verifyRequest(HttpServletRequest request, 
         HttpServletResponse response, TaskTracker tracker, String jobId) 
     throws IOException {
-      JobTokens jt = null;
-      synchronized (tracker.runningJobs) {
-        RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
-        if (rjob == null) {
-          throw new IOException("Unknown job " + jobId + "!!");
-        }
-        jt = rjob.jobTokens;
-      }
+      SecretKey tokenSecret = tracker.getJobTokenSecretManager()
+          .retrieveTokenSecret(jobId);
       // string to encrypt
       String enc_str = SecureShuffleUtils.buildMsgFrom(request);
       
@@ -3327,17 +3329,16 @@
       LOG.debug("verifying request. enc_str="+enc_str+"; hash=..."+
           urlHashStr.substring(len-len/2, len-1)); // half of the hash for debug
 
-      SecureShuffleUtils ssutil = new SecureShuffleUtils(jt.getShuffleJobToken());
       // verify - throws exception
       try {
-        ssutil.verifyReply(urlHashStr, enc_str);
+        SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);
       } catch (IOException ioe) {
         response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
         throw ioe;
       }
       
       // verification passed - encode the reply
-      String reply = ssutil.generateHash(urlHashStr.getBytes());
+      String reply = SecureShuffleUtils.generateHash(urlHashStr.getBytes(), tokenSecret);
       response.addHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
       
       len = reply.length();
@@ -3572,7 +3573,7 @@
         throws IOException {
       // check if the tokenJob file is there..
       Path skPath = new Path(systemDirectory, 
-          jobId.toString()+"/"+JobTokens.JOB_TOKEN_FILENAME);
+          jobId.toString()+"/"+SecureShuffleUtils.JOB_TOKEN_FILENAME);
       
       FileStatus status = null;
       long jobTokenSize = -1;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java Tue Dec 22 01:33:13 2009
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce.security;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * get/set, store/load security keys
- * key's value - byte[]
- * store/load from DataInput/DataOuptut
- * List of currently store keys:
- *  jobToken for secure shuffle HTTP Get
- *
- */
-@InterfaceAudience.Private
-public class JobTokens implements Writable {
-  /**
-   * file name used on HDFS for generated keys
-   */
-  public static final String JOB_TOKEN_FILENAME = "jobTokens";
-
-  private byte [] shuffleJobToken = null; // jobtoken for shuffle (map output)
-
-  
-  /**
-   * returns the key value for the alias
-   * @return key for this alias
-   */
-  public byte[] getShuffleJobToken() {
-    return shuffleJobToken;
-  }
-  
-  /**
-   * sets the jobToken
-   * @param key
-   */
-  public void setShuffleJobToken(byte[] key) {
-    shuffleJobToken = key;
-  }
-  
-  /**
-   * stores all the keys to DataOutput
-   * @param out
-   * @throws IOException
-   */
-  @Override
-  public void write(DataOutput out) throws IOException {
-    WritableUtils.writeCompressedByteArray(out, shuffleJobToken);
-  }
-  
-  /**
-   * loads all the keys
-   * @param in
-   * @throws IOException
-   */
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    shuffleJobToken = WritableUtils.readCompressedByteArray(in);
-  }
-}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java Tue Dec 22 01:33:13 2009
@@ -22,16 +22,13 @@
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.URL;
-import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
 
-import javax.crypto.KeyGenerator;
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.SecretKey;
 import javax.servlet.http.HttpServletRequest;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.record.Utils;
 
 /**
@@ -43,62 +40,17 @@
 public class SecureShuffleUtils {
   public static final String HTTP_HEADER_URL_HASH = "UrlHash";
   public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
-  public static KeyGenerator kg = null;
-  public static String DEFAULT_ALG="HmacSHA1";
-  
-  private SecretKeySpec secretKey;
-  private Mac mac;
-  
   /**
-   * static generate keys
-   * @return new encoded key
-   * @throws NoSuchAlgorithmException
+   * file name used on HDFS for generated job token
    */
-  public static byte[] getNewEncodedKey() throws NoSuchAlgorithmException{
-    SecretKeySpec key = generateKey(DEFAULT_ALG);
-    return key.getEncoded();
-  }
-  
-  private static SecretKeySpec generateKey(String alg) throws NoSuchAlgorithmException {
-    if(kg==null) {
-      kg = KeyGenerator.getInstance(alg);
-    }
-    return (SecretKeySpec) kg.generateKey();
-  }
-
-  /**
-   * Create a util object with alg and key
-   * @param sKeyEncoded
-   * @throws NoSuchAlgorithmException
-   * @throws InvalidKeyException
-   */
-  public SecureShuffleUtils(byte [] sKeyEncoded) 
-  throws  IOException{
-    secretKey = new SecretKeySpec(sKeyEncoded, SecureShuffleUtils.DEFAULT_ALG);
-    try {
-      mac = Mac.getInstance(DEFAULT_ALG);
-      mac.init(secretKey);
-    } catch (NoSuchAlgorithmException nae) {
-      throw new IOException(nae);
-    } catch( InvalidKeyException ie) {
-      throw new IOException(ie);
-    }
-  }
-  
-  /** 
-   * get key as byte[]
-   * @return encoded key
-   */
-  public byte [] getEncodedKey() {
-    return secretKey.getEncoded();
-  }
+  public static final String JOB_TOKEN_FILENAME = "jobToken";
   
   /**
    * Base64 encoded hash of msg
    * @param msg
    */
-  public String generateHash(byte[] msg) {
-    return new String(Base64.encodeBase64(generateByteHash(msg)));
+  public static String generateHash(byte[] msg, SecretKey key) {
+    return new String(Base64.encodeBase64(generateByteHash(msg, key)));
   }
   
   /**
@@ -106,8 +58,8 @@
    * @param msg
    * @return
    */
-  private byte[] generateByteHash(byte[] msg) {
-    return mac.doFinal(msg);
+  private static byte[] generateByteHash(byte[] msg, SecretKey key) {
+    return JobTokenSecretManager.computeHash(msg, key);
   }
   
   /**
@@ -115,20 +67,21 @@
    * @param newHash
    * @return true if is the same
    */
-  private boolean verifyHash(byte[] hash, byte[] msg) {
-    byte[] msg_hash = generateByteHash(msg);
+  private static boolean verifyHash(byte[] hash, byte[] msg, SecretKey key) {
+    byte[] msg_hash = generateByteHash(msg, key);
     return Utils.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length) == 0;
   }
   
   /**
    * Aux util to calculate hash of a String
    * @param enc_str
+   * @param key
    * @return Base64 encodedHash
    * @throws IOException
    */
-  public String hashFromString(String enc_str) 
+  public static String hashFromString(String enc_str, SecretKey key) 
   throws IOException {
-    return generateHash(enc_str.getBytes()); 
+    return generateHash(enc_str.getBytes(), key); 
   }
   
   /**
@@ -137,11 +90,11 @@
    * @param msg
    * @throws IOException if not the same
    */
-  public void verifyReply(String base64Hash, String msg)
+  public static void verifyReply(String base64Hash, String msg, SecretKey key)
   throws IOException {
     byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
     
-    boolean res = verifyHash(hash, msg.getBytes());
+    boolean res = verifyHash(hash, msg.getBytes(), key);
     
     if(res != true) {
       throw new IOException("Verification of the hashReply failed");

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java?rev=893055&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java Tue Dec 22 01:33:13 2009
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * The token identifier for job token
+ */
+@InterfaceAudience.Private
+public class JobTokenIdentifier extends TokenIdentifier {
+  private Text jobid;
+  final static Text KIND_NAME = new Text("mapreduce.job");
+  
+  /**
+   * Create a job token identifier from a jobid
+   * @param jobid the jobid to use
+   */
+  public JobTokenIdentifier(Text jobid) {
+    this.jobid = jobid;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public Text getKind() {
+    return KIND_NAME;
+  }
+  
+  /**
+   * Get the jobid
+   * @return the jobid
+   */
+  public Text getJobId() {
+    return jobid;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    jobid.readFields(in);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    jobid.write(out);
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java?rev=893055&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java Tue Dec 22 01:33:13 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * SecretManager for job token. It can be used to cache generated job tokens.
+ */
+@InterfaceAudience.Private
+public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier> {
+  private final SecretKey masterKey;
+  private final Map<String, SecretKey> currentJobTokens;
+
+  /**
+   * Convert the byte[] to a secret key
+   * @param key the byte[] to create the secret key from
+   * @return the secret key
+   */
+  public static SecretKey createSecretKey(byte[] key) {
+    return SecretManager.createSecretKey(key);
+  }
+  
+  /**
+   * Compute the HMAC hash of the message using the key
+   * @param msg the message to hash
+   * @param key the key to use
+   * @return the computed hash
+   */
+  public static byte[] computeHash(byte[] msg, SecretKey key) {
+    return createPassword(msg, key);
+  }
+  
+  /**
+   * Default constructor
+   */
+  public JobTokenSecretManager() {
+    this.masterKey = generateSecret();
+    this.currentJobTokens = new TreeMap<String, SecretKey>();
+  }
+  
+  /**
+   * Create a new password/secret for the given job token identifier.
+   * @param identifier the job token identifier
+   * @return token password/secret
+   */
+  @Override
+  public byte[] createPassword(JobTokenIdentifier identifier) {
+    byte[] result = createPassword(identifier.getBytes(), masterKey);
+    return result;
+  }
+
+  /**
+   * Add the job token of a job to cache
+   * @param jobId the job that owns the token
+   * @param token the job token
+   */
+  public void addTokenForJob(String jobId, Token<JobTokenIdentifier> token) {
+    SecretKey tokenSecret = createSecretKey(token.getPassword());
+    synchronized (currentJobTokens) {
+      currentJobTokens.put(jobId, tokenSecret);
+    }
+  }
+
+  /**
+   * Remove the cached job token of a job from cache
+   * @param jobId the job whose token is to be removed
+   */
+  public void removeTokenForJob(String jobId) {
+    synchronized (currentJobTokens) {
+      currentJobTokens.remove(jobId);
+    }
+  }
+  
+  /**
+   * Look up the token password/secret for the given jobId.
+   * @param jobId the jobId to look up
+   * @return token password/secret as SecretKey
+   * @throws InvalidToken
+   */
+  public SecretKey retrieveTokenSecret(String jobId) throws InvalidToken {
+    SecretKey tokenSecret = null;
+    synchronized (currentJobTokens) {
+      tokenSecret = currentJobTokens.get(jobId);
+    }
+    if (tokenSecret == null) {
+      throw new InvalidToken("Can't find job token for job " + jobId + " !!");
+    }
+    return tokenSecret;
+  }
+  
+  /**
+   * Look up the token password/secret for the given job token identifier.
+   * @param identifier the job token identifier to look up
+   * @return token password/secret as byte[]
+   * @throws InvalidToken
+   */
+  @Override
+  public byte[] retrievePassword(JobTokenIdentifier identifier)
+      throws InvalidToken {
+    return retrieveTokenSecret(identifier.getJobId().toString()).getEncoded();
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Tue Dec 22 01:33:13 2009
@@ -28,6 +28,8 @@
 import java.util.List;
 import java.util.Set;
 
+import javax.crypto.SecretKey;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
@@ -41,15 +43,11 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.security.JobTokens;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import org.apache.commons.codec.binary.Base64;
-import java.security.GeneralSecurityException;
-
 class Fetcher<K,V> extends Thread {
   
   private static final Log LOG = LogFactory.getLog(Fetcher.class);
@@ -88,12 +86,12 @@
   // Decompression of map-outputs
   private final CompressionCodec codec;
   private final Decompressor decompressor;
-  private final byte[] shuffleJobToken;
+  private final SecretKey jobTokenSecret;
 
   public Fetcher(JobConf job, TaskAttemptID reduceId, 
                  ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
                  Reporter reporter, ShuffleClientMetrics metrics,
-                 ExceptionReporter exceptionReporter, byte [] shuffleJobToken) {
+                 ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
     this.reporter = reporter;
     this.scheduler = scheduler;
     this.merger = merger;
@@ -101,7 +99,7 @@
     this.exceptionReporter = exceptionReporter;
     this.id = ++nextId;
     this.reduce = reduceId.getTaskID().getId();
-    this.shuffleJobToken = shuffleJobToken;
+    this.jobTokenSecret = jobTokenSecret;
     ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.IO_ERROR.toString());
     wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
@@ -196,9 +194,8 @@
       URLConnection connection = url.openConnection();
       
       // generate hash of the url
-      SecureShuffleUtils ssutil = new SecureShuffleUtils(shuffleJobToken);
       String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
-      String encHash = ssutil.hashFromString(msgToEncode);
+      String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
       
       // put url hash into http header
       connection.addRequestProperty(
@@ -215,7 +212,7 @@
       }
       LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
       // verify that replyHash is HMac of encHash
-      ssutil.verifyReply(replyHash, encHash);
+      SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
       LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
     } catch (IOException ie) {
       ioErrs.increment(1);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Tue Dec 22 01:33:13 2009
@@ -107,7 +107,7 @@
     for (int i=0; i < numFetchers; ++i) {
       fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
                                      reporter, metrics, this, 
-                                     reduceTask.getJobTokens().getShuffleJobToken());
+                                     reduceTask.getJobTokenSecret());
       fetchers[i].start();
     }
     

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java Tue Dec 22 01:33:13 2009
@@ -27,9 +27,14 @@
 import java.net.URLConnection;
 import java.security.GeneralSecurityException;
 
+import javax.crypto.SecretKey;
+
 import org.apache.hadoop.http.HttpServer;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.token.Token;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,6 +45,7 @@
   private static URL baseUrl;
   private static File dir;
   private static final String JOB_ID = "job_20091117075357176_0001";
+  private static final String BAD_JOB_ID = "job_20091117075357176_0002";
   
   // create fake url
   private URL getMapOutputURL(String host)  throws MalformedURLException {
@@ -86,25 +92,26 @@
     URL url = getMapOutputURL(baseUrl.toString());
     String enc_str = SecureShuffleUtils.buildMsgFrom(url);
     URLConnection connectionGood = url.openConnection();
-
-    // create key 
-    byte [] key= SecureShuffleUtils.getNewEncodedKey();
     
-    // create fake TaskTracker - needed for keys storage
-    JobTokens jt = new JobTokens();
-    jt.setShuffleJobToken(key);
     TaskTracker tt  = new TaskTracker();
+    JobTokenSecretManager jtSecretManager = new JobTokenSecretManager();
+    // create fake TaskTracker - needed for keys storage
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(JOB_ID));
+    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(identifier,
+        jtSecretManager);
+    SecretKey tokenSecret = JobTokenSecretManager.createSecretKey(jt.getPassword());
     addJobToken(tt, JOB_ID, jt); // fake id
     server.setAttribute("task.tracker", tt);
 
     // encode the url
-    SecureShuffleUtils mac = new SecureShuffleUtils(key);
-    String urlHashGood = mac.generateHash(enc_str.getBytes()); // valid hash
+    String urlHashGood = SecureShuffleUtils.generateHash(enc_str.getBytes(), tokenSecret); // valid hash
     
     // another the key
-    byte [] badKey= SecureShuffleUtils.getNewEncodedKey();
-    mac = new SecureShuffleUtils(badKey);
-    String urlHashBad = mac.generateHash(enc_str.getBytes()); // invalid hash 
+    JobTokenIdentifier badIdentifier = new JobTokenIdentifier(new Text(BAD_JOB_ID));
+    Token<JobTokenIdentifier> badToken = new Token<JobTokenIdentifier>(badIdentifier,
+        jtSecretManager);
+    SecretKey badSecret = JobTokenSecretManager.createSecretKey(badToken.getPassword());
+    String urlHashBad = SecureShuffleUtils.generateHash(enc_str.getBytes(), badSecret); // invalid hash 
     
     // put url hash into http header
     connectionGood.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, urlHashGood);
@@ -135,13 +142,13 @@
     } 
   }
   /*Note that this method is there for a unit testcase (TestShuffleJobToken)*/
-  void addJobToken(TaskTracker tt, String jobIdStr, JobTokens jt) {
+  void addJobToken(TaskTracker tt, String jobIdStr, Token<JobTokenIdentifier> token) {
     JobID jobId = JobID.forName(jobIdStr);
     TaskTracker.RunningJob rJob = new TaskTracker.RunningJob(jobId);
-    rJob.jobTokens = jt;
     synchronized (tt.runningJobs) {
       tt.runningJobs.put(jobId, rJob);
     }
+    tt.getJobTokenSecretManager().addTokenForJob(jobIdStr, token);
   }
 
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Tue Dec 22 01:33:13 2009
@@ -37,9 +37,11 @@
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
@@ -255,10 +257,10 @@
     if(!dir.exists())
       assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs());
     
-    File jobTokenFile = new File(dir, JobTokens.JOB_TOKEN_FILENAME);
+    File jobTokenFile = new File(dir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
     FileOutputStream fos = new FileOutputStream(jobTokenFile);
     java.io.DataOutputStream out = new java.io.DataOutputStream(fos);
-    JobTokens jt = new JobTokens();
+    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
     jt.write(out); // writing empty file, we don't the keys for this test 
     out.close();
   }