You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:40:10 UTC
svn commit: r1077097 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/security/
mapred/org/apache/hadoop/mapreduce/security/token/
test/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Mar 4 03:40:10 2011
New Revision: 1077097
URL: http://svn.apache.org/viewvc?rev=1077097&view=rev
Log:
commit a9f6714543b5b7623b8da159e8f2942b3f38dc40
Author: Jitendra Nath Pandey <ji...@yahoo-inc.com>
Date: Thu Jan 7 11:15:53 2010 -0800
MAPREDUCE-1250 from https://issues.apache.org/jira/secure/attachment/12429629/MR-1250-0_20.2.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1250. Refactor job token to use a common token interface.
+ (Jitendra Nath Pandey)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java
Removed:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/JobTokens.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestShuffleJobToken.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar 4 03:40:10 2011
@@ -34,10 +34,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.JvmTask;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.security.token.Token;
import org.apache.log4j.LogManager;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
@@ -69,7 +71,7 @@ class Child {
// file name is passed thru env
String jobTokenFile = System.getenv().get("JOB_TOKEN_FILE");
FileSystem localFs = FileSystem.getLocal(defaultConf);
- JobTokens jt = loadJobTokens(jobTokenFile, localFs);
+ Token<JobTokenIdentifier> jt = loadJobToken(jobTokenFile, localFs);
LOG.debug("Child: got jobTokenfile=" + jobTokenFile);
TaskUmbilicalProtocol umbilical =
@@ -150,7 +152,7 @@ class Child {
JobConf job = new JobConf(task.getJobFile());
// set the jobTokenFile into task
- task.setJobTokens(jt);
+ task.setJobTokenSecret(JobTokenSecretManager.createSecretKey(jt.getPassword()));
//setupWorkDir actually sets up the symlinks for the distributed
//cache. After a task exits we wipe the workdir clean, and hence
@@ -219,16 +221,16 @@ class Child {
}
/**
- * load secret keys from a file
+ * load job token from a file
* @param jobTokenFile
* @param conf
* @throws IOException
*/
- private static JobTokens loadJobTokens(String jobTokenFile, FileSystem localFS)
+ private static Token<JobTokenIdentifier> loadJobToken(String jobTokenFile, FileSystem localFS)
throws IOException {
Path localJobTokenFile = new Path (jobTokenFile);
FSDataInputStream in = localFS.open(localJobTokenFile);
- JobTokens jt = new JobTokens();
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
jt.readFields(in);
LOG.debug("Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath());
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 03:40:10 2011
@@ -39,9 +39,10 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobHistory.Values;
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
@@ -49,6 +50,7 @@ import org.apache.hadoop.metrics.Metrics
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -503,7 +505,7 @@ class JobInProgress {
//
// generate security keys needed by Tasks
//
- generateJobTokens(fs, jobtracker.getSystemDirectoryForJob(jobId));
+ generateJobToken(fs);
//
// read input splits and create a map per a split
@@ -3077,30 +3079,25 @@ class JobInProgress {
);
}
}
-
+
/**
- * generate keys and save it into the file
- * @param jobDir
+ * generate job token and save it into the file
* @throws IOException
*/
- private void generateJobTokens(FileSystem fs, Path jobDir) throws IOException{
- Path keysFile = new Path(jobDir, JobTokens.JOB_TOKEN_FILENAME);
+ private void generateJobToken(FileSystem fs) throws IOException {
+ Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
+ Path keysFile = new Path(jobDir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
+ // we need to create this file using the jobtracker's filesystem
FSDataOutputStream os = fs.create(keysFile);
- //create JobTokens file and add key to it
- JobTokens jt = new JobTokens();
- byte [] key;
- try {
- // new key
- key = SecureShuffleUtils.getNewEncodedKey();
- } catch (java.security.GeneralSecurityException e) {
- throw new IOException(e);
- }
- // remember the key
- jt.setShuffleJobToken(key);
- // other keys..
- jt.write(os);
+ //create JobToken file and write token to it
+ JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
+ .toString()));
+ Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(identifier,
+ jobtracker.getJobTokenSecretManager());
+ token.setService(identifier.getJobId());
+ token.write(os);
os.close();
- LOG.debug("jobTokens generated and stored in "+ keysFile.toUri().getPath());
+ LOG.debug("jobToken generated and stored in "+ keysFile.toUri().getPath());
}
-
+
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:40:10 2011
@@ -100,6 +100,7 @@ import org.apache.hadoop.util.VersionInf
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
/*******************************************************
@@ -171,6 +172,13 @@ public class JobTracker implements MRCon
final static FsPermission SYSTEM_FILE_PERMISSION =
FsPermission.createImmutable((short) 0700); // rwx------
+ private final JobTokenSecretManager jobTokenSecretManager
+ = new JobTokenSecretManager();
+
+ JobTokenSecretManager getJobTokenSecretManager() {
+ return jobTokenSecretManager;
+ }
+
/**
* A client tried to submit a job before the Job Tracker was ready.
*/
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar 4 03:40:10 2011
@@ -48,6 +48,8 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import javax.crypto.SecretKey;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -85,10 +87,7 @@ import org.apache.hadoop.util.Progressab
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.mapreduce.security.JobTokens;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
-import org.apache.commons.codec.binary.Base64;
-import java.security.GeneralSecurityException;
/** A Reduce task. */
class ReduceTask extends Task {
@@ -1152,14 +1151,14 @@ class ReduceTask extends Task {
private CompressionCodec codec = null;
private Decompressor decompressor = null;
- private final byte[] shuffleJobToken;
+ private final SecretKey jobTokenSecret;
- public MapOutputCopier(JobConf job, Reporter reporter, byte [] shuffleJobToken) {
+ public MapOutputCopier(JobConf job, Reporter reporter, SecretKey jobTokenSecret) {
setName("MapOutputCopier " + reduceTask.getTaskID() + "." + id);
LOG.debug(getName() + " created");
this.reporter = reporter;
- this.shuffleJobToken = shuffleJobToken;
+ this.jobTokenSecret = jobTokenSecret;
shuffleConnectionTimeout =
job.getInt("mapreduce.reduce.shuffle.connect.timeout", STALLED_COPY_TIMEOUT);
@@ -1389,9 +1388,8 @@ class ReduceTask extends Task {
URLConnection connection = url.openConnection();
// generate hash of the url
- SecureShuffleUtils ssutil = new SecureShuffleUtils(shuffleJobToken);
String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
- String encHash = ssutil.hashFromString(msgToEncode);
+ String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
// put url hash into http header
connection.addRequestProperty(
@@ -1407,7 +1405,7 @@ class ReduceTask extends Task {
}
LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
// verify that replyHash is HMac of encHash
- ssutil.verifyReply(replyHash, encHash);
+ SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
// Validate header from map output
@@ -1893,8 +1891,8 @@ class ReduceTask extends Task {
// start all the copying threads
for (int i=0; i < numCopiers; i++) {
- MapOutputCopier copier = new MapOutputCopier(conf, reporter,
- reduceTask.getJobTokens().getShuffleJobToken());
+ MapOutputCopier copier = new MapOutputCopier(conf, reporter,
+ reduceTask.getJobTokenSecret());
copiers.add(copier);
copier.start();
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar 4 03:40:10 2011
@@ -30,6 +30,8 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.crypto.SecretKey;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
@@ -47,8 +49,6 @@ import org.apache.hadoop.io.serializer.D
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.security.JobTokens;
-import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
@@ -151,6 +151,7 @@ abstract public class Task implements Wr
protected TaskUmbilicalProtocol umbilical;
private int numSlotsRequired;
protected JobTokens jobTokens=null; // storage of the secret keys
+ protected SecretKey tokenSecret;
////////////////////////////////////////////
// Constructors
@@ -204,19 +205,19 @@ abstract public class Task implements Wr
}
/**
- * set JobToken storage
- * @param jt
+ * Set the job token secret
+ * @param tokenSecret the secret
*/
- public void setJobTokens(JobTokens jt) {
- this.jobTokens = jt;
+ public void setJobTokenSecret(SecretKey tokenSecret) {
+ this.tokenSecret = tokenSecret;
}
/**
- * get JobToken storage
- * @return storage object
+ * Get the job token secret
+ * @return the token secret
*/
- public JobTokens getJobTokens() {
- return this.jobTokens;
+ public SecretKey getJobTokenSecret() {
+ return this.tokenSecret;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 03:40:10 2011
@@ -43,6 +43,7 @@ import java.util.concurrent.BlockingQueu
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;
+import javax.crypto.SecretKey;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
@@ -74,8 +75,9 @@ import org.apache.hadoop.mapred.TaskStat
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapred.pipes.Submitter;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.JobTokens;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsException;
import org.apache.hadoop.metrics.MetricsRecord;
@@ -90,6 +92,7 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.MemoryCalculatorPlugin;
import org.apache.hadoop.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
@@ -189,6 +192,13 @@ public class TaskTracker
*/
Map<TaskAttemptID, TaskInProgress> runningTasks = null;
Map<JobID, RunningJob> runningJobs = new TreeMap<JobID, RunningJob>();
+ private final JobTokenSecretManager jobTokenSecretManager
+ = new JobTokenSecretManager();
+
+ JobTokenSecretManager getJobTokenSecretManager() {
+ return jobTokenSecretManager;
+ }
+
volatile int mapTotal = 0;
volatile int reduceTotal = 0;
boolean justStarted = true;
@@ -912,9 +922,9 @@ public class TaskTracker
localizeJobTokenFile(t.getUser(), jobId, localJobConf);
FSDataInputStream in = localFs.open(new Path(
rjob.jobConf.get(JobContext.JOB_TOKEN_FILE)));
- JobTokens jt = new JobTokens();
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
jt.readFields(in);
- rjob.jobTokens = jt; // store JobToken object per job
+ getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
rjob.localized = true;
taskController.initializeJob(jobId);
@@ -1544,7 +1554,7 @@ public class TaskTracker
synchronized(runningJobs) {
runningJobs.remove(jobId);
}
-
+ getJobTokenSecretManager().removeTokenForJob(jobId.toString());
}
@@ -2911,7 +2921,6 @@ public class TaskTracker
boolean localized;
boolean keepJobFiles;
FetchStatus f;
- JobTokens jobTokens;
RunningJob(JobID jobid) {
this.jobid = jobid;
localized = false;
@@ -3221,14 +3230,8 @@ public class TaskTracker
private void verifyRequest(HttpServletRequest request,
HttpServletResponse response, TaskTracker tracker, String jobId)
throws IOException {
- JobTokens jt = null;
- synchronized (tracker.runningJobs) {
- RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
- if (rjob == null) {
- throw new IOException("Unknown job " + jobId + "!!");
- }
- jt = rjob.jobTokens;
- }
+ SecretKey tokenSecret = tracker.getJobTokenSecretManager()
+ .retrieveTokenSecret(jobId);
// string to encrypt
String enc_str = SecureShuffleUtils.buildMsgFrom(request);
@@ -3242,17 +3245,16 @@ public class TaskTracker
LOG.debug("verifying request. enc_str="+enc_str+"; hash=..."+
urlHashStr.substring(len-len/2, len-1)); // half of the hash for debug
- SecureShuffleUtils ssutil = new SecureShuffleUtils(jt.getShuffleJobToken());
// verify - throws exception
try {
- ssutil.verifyReply(urlHashStr, enc_str);
+ SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);
} catch (IOException ioe) {
response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
throw ioe;
}
// verification passed - encode the reply
- String reply = ssutil.generateHash(urlHashStr.getBytes());
+ String reply = SecureShuffleUtils.generateHash(urlHashStr.getBytes(), tokenSecret);
response.addHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
len = reply.length();
@@ -3499,7 +3501,7 @@ public class TaskTracker
throws IOException {
// check if the tokenJob file is there..
Path skPath = new Path(systemDirectory,
- jobId.toString()+"/"+JobTokens.JOB_TOKEN_FILENAME);
+ jobId.toString()+"/"+SecureShuffleUtils.JOB_TOKEN_FILENAME);
FileStatus status = null;
long jobTokenSize = -1;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java Fri Mar 4 03:40:10 2011
@@ -22,15 +22,12 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
-import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
-import javax.crypto.KeyGenerator;
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.SecretKey;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.record.Utils;
/**
@@ -41,62 +38,17 @@ import org.apache.hadoop.record.Utils;
public class SecureShuffleUtils {
public static final String HTTP_HEADER_URL_HASH = "UrlHash";
public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
- public static KeyGenerator kg = null;
- public static String DEFAULT_ALG="HmacSHA1";
-
- private SecretKeySpec secretKey;
- private Mac mac;
-
/**
- * static generate keys
- * @return new encoded key
- * @throws NoSuchAlgorithmException
+ * file name used on HDFS for generated job token
*/
- public static byte[] getNewEncodedKey() throws NoSuchAlgorithmException{
- SecretKeySpec key = generateKey(DEFAULT_ALG);
- return key.getEncoded();
- }
-
- private static SecretKeySpec generateKey(String alg) throws NoSuchAlgorithmException {
- if(kg==null) {
- kg = KeyGenerator.getInstance(alg);
- }
- return (SecretKeySpec) kg.generateKey();
- }
-
- /**
- * Create a util object with alg and key
- * @param sKeyEncoded
- * @throws NoSuchAlgorithmException
- * @throws InvalidKeyException
- */
- public SecureShuffleUtils(byte [] sKeyEncoded)
- throws IOException{
- secretKey = new SecretKeySpec(sKeyEncoded, SecureShuffleUtils.DEFAULT_ALG);
- try {
- mac = Mac.getInstance(DEFAULT_ALG);
- mac.init(secretKey);
- } catch (NoSuchAlgorithmException nae) {
- throw new IOException(nae);
- } catch( InvalidKeyException ie) {
- throw new IOException(ie);
- }
- }
-
- /**
- * get key as byte[]
- * @return encoded key
- */
- public byte [] getEncodedKey() {
- return secretKey.getEncoded();
- }
+ public static final String JOB_TOKEN_FILENAME = "jobToken";
/**
* Base64 encoded hash of msg
* @param msg
*/
- public String generateHash(byte[] msg) {
- return new String(Base64.encodeBase64(generateByteHash(msg)));
+ public static String generateHash(byte[] msg, SecretKey key) {
+ return new String(Base64.encodeBase64(generateByteHash(msg, key)));
}
/**
@@ -104,8 +56,8 @@ public class SecureShuffleUtils {
* @param msg
* @return
*/
- private byte[] generateByteHash(byte[] msg) {
- return mac.doFinal(msg);
+ private static byte[] generateByteHash(byte[] msg, SecretKey key) {
+ return JobTokenSecretManager.computeHash(msg, key);
}
/**
@@ -113,20 +65,21 @@ public class SecureShuffleUtils {
* @param newHash
* @return true if is the same
*/
- private boolean verifyHash(byte[] hash, byte[] msg) {
- byte[] msg_hash = generateByteHash(msg);
+ private static boolean verifyHash(byte[] hash, byte[] msg, SecretKey key) {
+ byte[] msg_hash = generateByteHash(msg, key);
return Utils.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length) == 0;
}
/**
* Aux util to calculate hash of a String
* @param enc_str
+ * @param key
* @return Base64 encodedHash
* @throws IOException
*/
- public String hashFromString(String enc_str)
+ public static String hashFromString(String enc_str, SecretKey key)
throws IOException {
- return generateHash(enc_str.getBytes());
+ return generateHash(enc_str.getBytes(), key);
}
/**
@@ -135,11 +88,11 @@ public class SecureShuffleUtils {
* @param msg
* @throws IOException if not the same
*/
- public void verifyReply(String base64Hash, String msg)
+ public static void verifyReply(String base64Hash, String msg, SecretKey key)
throws IOException {
byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
- boolean res = verifyHash(hash, msg.getBytes());
+ boolean res = verifyHash(hash, msg.getBytes(), key);
if(res != true) {
throw new IOException("Verification of the hashReply failed");
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java?rev=1077097&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java Fri Mar 4 03:40:10 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * The token identifier for job token
+ */
+public class JobTokenIdentifier extends TokenIdentifier {
+ private Text jobid;
+ final static Text KIND_NAME = new Text("mapreduce.job");
+
+ /**
+ * Create a job token identifier from a jobid
+ * @param jobid the jobid to use
+ */
+ public JobTokenIdentifier(Text jobid) {
+ this.jobid = jobid;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Text getKind() {
+ return KIND_NAME;
+ }
+
+ /**
+ * Get the jobid
+ * @return the jobid
+ */
+ public Text getJobId() {
+ return jobid;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ jobid.readFields(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ jobid.write(out);
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java?rev=1077097&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java Fri Mar 4 03:40:10 2011
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * SecretManager for job token. It can be used to cache generated job tokens.
+ */
+public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier> {
+ private final SecretKey masterKey;
+ private final Map<String, SecretKey> currentJobTokens;
+
+ /**
+ * Convert the byte[] to a secret key
+ * @param key the byte[] to create the secret key from
+ * @return the secret key
+ */
+ public static SecretKey createSecretKey(byte[] key) {
+ return SecretManager.createSecretKey(key);
+ }
+
+ /**
+ * Compute the HMAC hash of the message using the key
+ * @param msg the message to hash
+ * @param key the key to use
+ * @return the computed hash
+ */
+ public static byte[] computeHash(byte[] msg, SecretKey key) {
+ return createPassword(msg, key);
+ }
+
+ /**
+ * Default constructor
+ */
+ public JobTokenSecretManager() {
+ this.masterKey = generateSecret();
+ this.currentJobTokens = new TreeMap<String, SecretKey>();
+ }
+
+ /**
+ * Create a new password/secret for the given job token identifier.
+ * @param identifier the job token identifier
+ * @return token password/secret
+ */
+ @Override
+ public byte[] createPassword(JobTokenIdentifier identifier) {
+ byte[] result = createPassword(identifier.getBytes(), masterKey);
+ return result;
+ }
+
+ /**
+ * Add the job token of a job to cache
+ * @param jobId the job that owns the token
+ * @param token the job token
+ */
+ public void addTokenForJob(String jobId, Token<JobTokenIdentifier> token) {
+ SecretKey tokenSecret = createSecretKey(token.getPassword());
+ synchronized (currentJobTokens) {
+ currentJobTokens.put(jobId, tokenSecret);
+ }
+ }
+
+ /**
+ * Remove the cached job token of a job from cache
+ * @param jobId the job whose token is to be removed
+ */
+ public void removeTokenForJob(String jobId) {
+ synchronized (currentJobTokens) {
+ currentJobTokens.remove(jobId);
+ }
+ }
+
+ /**
+ * Look up the token password/secret for the given jobId.
+ * @param jobId the jobId to look up
+ * @return token password/secret as SecretKey
+ * @throws InvalidToken
+ */
+ public SecretKey retrieveTokenSecret(String jobId) throws InvalidToken {
+ SecretKey tokenSecret = null;
+ synchronized (currentJobTokens) {
+ tokenSecret = currentJobTokens.get(jobId);
+ }
+ if (tokenSecret == null) {
+ throw new InvalidToken("Can't find job token for job " + jobId + " !!");
+ }
+ return tokenSecret;
+ }
+
+ /**
+ * Look up the token password/secret for the given job token identifier.
+ * @param identifier the job token identifier to look up
+ * @return token password/secret as byte[]
+ * @throws InvalidToken
+ */
+ @Override
+ public byte[] retrievePassword(JobTokenIdentifier identifier)
+ throws InvalidToken {
+ return retrieveTokenSecret(identifier.getJobId().toString()).getEncoded();
+ }
+
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestShuffleJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestShuffleJobToken.java?rev=1077097&r1=1077096&r2=1077097&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestShuffleJobToken.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestShuffleJobToken.java Fri Mar 4 03:40:10 2011
@@ -27,9 +27,14 @@ import java.net.URL;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
+import javax.crypto.SecretKey;
+
import org.apache.hadoop.http.HttpServer;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.token.Token;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -40,6 +45,7 @@ public class TestShuffleJobToken {
private static URL baseUrl;
private static File dir;
private static final String JOB_ID = "job_20091117075357176_0001";
+ private static final String BAD_JOB_ID = "job_20091117075357176_0002";
// create fake url
private URL getMapOutputURL(String host) throws MalformedURLException {
@@ -86,25 +92,26 @@ public class TestShuffleJobToken {
URL url = getMapOutputURL(baseUrl.toString());
String enc_str = SecureShuffleUtils.buildMsgFrom(url);
URLConnection connectionGood = url.openConnection();
-
- // create key
- byte [] key= SecureShuffleUtils.getNewEncodedKey();
- // create fake TaskTracker - needed for keys storage
- JobTokens jt = new JobTokens();
- jt.setShuffleJobToken(key);
TaskTracker tt = new TaskTracker();
+ JobTokenSecretManager jtSecretManager = new JobTokenSecretManager();
+ // create fake TaskTracker - needed for keys storage
+ JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(JOB_ID));
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(identifier,
+ jtSecretManager);
+ SecretKey tokenSecret = JobTokenSecretManager.createSecretKey(jt.getPassword());
addJobToken(tt, JOB_ID, jt); // fake id
server.setAttribute("task.tracker", tt);
// encode the url
- SecureShuffleUtils mac = new SecureShuffleUtils(key);
- String urlHashGood = mac.generateHash(enc_str.getBytes()); // valid hash
+ String urlHashGood = SecureShuffleUtils.generateHash(enc_str.getBytes(), tokenSecret); // valid hash
// another the key
- byte [] badKey= SecureShuffleUtils.getNewEncodedKey();
- mac = new SecureShuffleUtils(badKey);
- String urlHashBad = mac.generateHash(enc_str.getBytes()); // invalid hash
+ JobTokenIdentifier badIdentifier = new JobTokenIdentifier(new Text(BAD_JOB_ID));
+ Token<JobTokenIdentifier> badToken = new Token<JobTokenIdentifier>(badIdentifier,
+ jtSecretManager);
+ SecretKey badSecret = JobTokenSecretManager.createSecretKey(badToken.getPassword());
+ String urlHashBad = SecureShuffleUtils.generateHash(enc_str.getBytes(), badSecret); // invalid hash
// put url hash into http header
connectionGood.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, urlHashGood);
@@ -135,13 +142,13 @@ public class TestShuffleJobToken {
}
}
/*Note that this method is there for a unit testcase (TestShuffleJobToken)*/
- void addJobToken(TaskTracker tt, String jobIdStr, JobTokens jt) {
+ void addJobToken(TaskTracker tt, String jobIdStr, Token<JobTokenIdentifier> token) {
JobID jobId = JobID.forName(jobIdStr);
TaskTracker.RunningJob rJob = new TaskTracker.RunningJob(jobId);
- rJob.jobTokens = jt;
synchronized (tt.runningJobs) {
tt.runningJobs.put(jobId, rJob);
}
+ tt.getJobTokenSecretManager().addTokenForJob(jobIdStr, token);
}
}