You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/12/22 02:33:14 UTC
svn commit: r893055 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapreduce/security/
src/java/org/apache/hadoop/mapreduce/security/token/
src/java/org/apache/hadoop/mapreduce/task/reduce/ src/test/mapre...
Author: ddas
Date: Tue Dec 22 01:33:13 2009
New Revision: 893055
URL: http://svn.apache.org/viewvc?rev=893055&view=rev
Log:
MAPREDUCE-1250. Refactors the JobToken to use Common's Token interface. Contributed by Kan Zhang.
Added:
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Dec 22 01:33:13 2009
@@ -87,6 +87,9 @@
MAPREDUCE-181. Changes the job submission process to be secure.
(Devaraj Das)
+ MAPREDUCE-1250. Refactors the JobToken to use Common's Token interface.
+ (Kan Zhang via ddas)
+
OPTIMIZATIONS
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Tue Dec 22 01:33:13 2009
@@ -34,10 +34,12 @@
import org.apache.hadoop.mapred.JvmTask;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.security.token.Token;
import org.apache.log4j.LogManager;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
@@ -73,7 +75,7 @@
// file name is passed thru env
String jobTokenFile = System.getenv().get("JOB_TOKEN_FILE");
FileSystem localFs = FileSystem.getLocal(defaultConf);
- JobTokens jt = loadJobTokens(jobTokenFile, localFs);
+ Token<JobTokenIdentifier> jt = loadJobToken(jobTokenFile, localFs);
LOG.debug("Child: got jobTokenfile=" + jobTokenFile);
TaskUmbilicalProtocol umbilical =
@@ -154,7 +156,7 @@
JobConf job = new JobConf(task.getJobFile());
// set the jobTokenFile into task
- task.setJobTokens(jt);
+ task.setJobTokenSecret(JobTokenSecretManager.createSecretKey(jt.getPassword()));
// setup the child's Configs.LOCAL_DIR. The child is now sandboxed and
// can only see files down and under attemtdir only.
@@ -226,16 +228,16 @@
}
/**
- * load secret keys from a file
+ * load job token from a file
* @param jobTokenFile
* @param conf
* @throws IOException
*/
- private static JobTokens loadJobTokens(String jobTokenFile, FileSystem localFS)
+ private static Token<JobTokenIdentifier> loadJobToken(String jobTokenFile, FileSystem localFS)
throws IOException {
Path localJobTokenFile = new Path (jobTokenFile);
FSDataInputStream in = localFS.open(localJobTokenFile);
- JobTokens jt = new JobTokens();
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
jt.readFields(in);
LOG.debug("Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath());
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Dec 22 01:33:13 2009
@@ -43,6 +43,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
@@ -62,7 +63,7 @@
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.mapreduce.split.JobSplit;
@@ -74,6 +75,7 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -591,7 +593,7 @@
//
// generate security keys needed by Tasks
//
- generateJobTokens(jobtracker.getSystemDirectoryForJob(jobId));
+ generateJobToken();
//
// read input splits and create a map per a split
@@ -3521,29 +3523,23 @@
}
/**
- * generate keys and save it into the file
- * @param jobDir
+ * generate job token and save it into the file
* @throws IOException
*/
- private void generateJobTokens(Path jobDir) throws IOException{
- Path keysFile = new Path(jobDir, JobTokens.JOB_TOKEN_FILENAME);
+ private void generateJobToken() throws IOException{
+ Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
+ Path keysFile = new Path(jobDir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
// we need to create this file using the jobtracker's filesystem
FSDataOutputStream os = jobtracker.getFileSystem().create(keysFile);
- //create JobTokens file and add key to it
- JobTokens jt = new JobTokens();
- byte [] key;
- try {
- // new key
- key = SecureShuffleUtils.getNewEncodedKey();
- } catch (java.security.GeneralSecurityException e) {
- throw new IOException(e);
- }
- // remember the key
- jt.setShuffleJobToken(key);
- // other keys..
- jt.write(os);
+ //create JobToken file and write token to it
+ JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
+ .toString()));
+ Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(identifier,
+ jobtracker.getJobTokenSecretManager());
+ token.setService(identifier.getJobId());
+ token.write(os);
os.close();
- LOG.debug("jobTokens generated and stored in "+ keysFile.toUri().getPath());
+ LOG.debug("jobToken generated and stored in "+ keysFile.toUri().getPath());
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Dec 22 01:33:13 2009
@@ -79,6 +79,7 @@
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
@@ -172,6 +173,13 @@
static final Clock DEFAULT_CLOCK = new Clock();
private final JobHistory jobHistory;
+
+ private final JobTokenSecretManager jobTokenSecretManager
+ = new JobTokenSecretManager();
+
+ JobTokenSecretManager getJobTokenSecretManager() {
+ return jobTokenSecretManager;
+ }
private MRAsyncDiskService asyncDiskService;
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Tue Dec 22 01:33:13 2009
@@ -28,6 +28,8 @@
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.crypto.SecretKey;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
@@ -50,8 +52,6 @@
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
-import org.apache.hadoop.mapreduce.security.JobTokens;
-import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
@@ -140,7 +140,7 @@
protected final Counters.Counter mergedMapOutputsCounter;
private int numSlotsRequired;
protected TaskUmbilicalProtocol umbilical;
- protected JobTokens jobTokens=null; // storage of the secret keys
+ protected SecretKey tokenSecret;
////////////////////////////////////////////
// Constructors
@@ -199,19 +199,19 @@
}
/**
- * set JobToken storage
- * @param jt
+ * Set the job token secret
+ * @param tokenSecret the secret
*/
- public void setJobTokens(JobTokens jt) {
- this.jobTokens = jt;
+ public void setJobTokenSecret(SecretKey tokenSecret) {
+ this.tokenSecret = tokenSecret;
}
/**
- * get JobToken storage
- * @return storage object
+ * Get the job token secret
+ * @return the token secret
*/
- public JobTokens getJobTokens() {
- return this.jobTokens;
+ public SecretKey getJobTokenSecret() {
+ return this.tokenSecret;
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Dec 22 01:33:13 2009
@@ -43,6 +43,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;
+import javax.crypto.SecretKey;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
@@ -73,8 +74,9 @@
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
-import org.apache.hadoop.mapreduce.security.JobTokens;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@@ -95,6 +97,7 @@
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin;
import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
@@ -193,6 +196,8 @@
*/
Map<TaskAttemptID, TaskInProgress> runningTasks = null;
Map<JobID, RunningJob> runningJobs = new TreeMap<JobID, RunningJob>();
+ private final JobTokenSecretManager jobTokenSecretManager
+ = new JobTokenSecretManager();
volatile int mapTotal = 0;
volatile int reduceTotal = 0;
@@ -426,6 +431,10 @@
}
}
+ JobTokenSecretManager getJobTokenSecretManager() {
+ return jobTokenSecretManager;
+ }
+
Localizer getLocalizer() {
return localizer;
}
@@ -896,10 +905,9 @@
localJobConf.getKeepFailedTaskFiles());
FSDataInputStream in = localFs.open(new Path(
rjob.jobConf.get(JobContext.JOB_TOKEN_FILE)));
- JobTokens jt = new JobTokens();
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
jt.readFields(in);
- rjob.jobTokens = jt; // store JobToken object per job
-
+ getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
rjob.localized = true;
}
}
@@ -1598,6 +1606,7 @@
synchronized(runningJobs) {
runningJobs.remove(jobId);
}
+ getJobTokenSecretManager().removeTokenForJob(jobId.toString());
}
/**
@@ -2940,7 +2949,6 @@
boolean localized;
boolean keepJobFiles;
FetchStatus f;
- JobTokens jobTokens;
RunningJob(JobID jobid) {
this.jobid = jobid;
localized = false;
@@ -3306,14 +3314,8 @@
private void verifyRequest(HttpServletRequest request,
HttpServletResponse response, TaskTracker tracker, String jobId)
throws IOException {
- JobTokens jt = null;
- synchronized (tracker.runningJobs) {
- RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
- if (rjob == null) {
- throw new IOException("Unknown job " + jobId + "!!");
- }
- jt = rjob.jobTokens;
- }
+ SecretKey tokenSecret = tracker.getJobTokenSecretManager()
+ .retrieveTokenSecret(jobId);
// string to encrypt
String enc_str = SecureShuffleUtils.buildMsgFrom(request);
@@ -3327,17 +3329,16 @@
LOG.debug("verifying request. enc_str="+enc_str+"; hash=..."+
urlHashStr.substring(len-len/2, len-1)); // half of the hash for debug
- SecureShuffleUtils ssutil = new SecureShuffleUtils(jt.getShuffleJobToken());
// verify - throws exception
try {
- ssutil.verifyReply(urlHashStr, enc_str);
+ SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);
} catch (IOException ioe) {
response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
throw ioe;
}
// verification passed - encode the reply
- String reply = ssutil.generateHash(urlHashStr.getBytes());
+ String reply = SecureShuffleUtils.generateHash(urlHashStr.getBytes(), tokenSecret);
response.addHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
len = reply.length();
@@ -3572,7 +3573,7 @@
throws IOException {
// check if the tokenJob file is there..
Path skPath = new Path(systemDirectory,
- jobId.toString()+"/"+JobTokens.JOB_TOKEN_FILENAME);
+ jobId.toString()+"/"+SecureShuffleUtils.JOB_TOKEN_FILENAME);
FileStatus status = null;
long jobTokenSize = -1;
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java Tue Dec 22 01:33:13 2009
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce.security;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * get/set, store/load security keys
- * key's value - byte[]
- * store/load from DataInput/DataOuptut
- * List of currently store keys:
- * jobToken for secure shuffle HTTP Get
- *
- */
-@InterfaceAudience.Private
-public class JobTokens implements Writable {
- /**
- * file name used on HDFS for generated keys
- */
- public static final String JOB_TOKEN_FILENAME = "jobTokens";
-
- private byte [] shuffleJobToken = null; // jobtoken for shuffle (map output)
-
-
- /**
- * returns the key value for the alias
- * @return key for this alias
- */
- public byte[] getShuffleJobToken() {
- return shuffleJobToken;
- }
-
- /**
- * sets the jobToken
- * @param key
- */
- public void setShuffleJobToken(byte[] key) {
- shuffleJobToken = key;
- }
-
- /**
- * stores all the keys to DataOutput
- * @param out
- * @throws IOException
- */
- @Override
- public void write(DataOutput out) throws IOException {
- WritableUtils.writeCompressedByteArray(out, shuffleJobToken);
- }
-
- /**
- * loads all the keys
- * @param in
- * @throws IOException
- */
- @Override
- public void readFields(DataInput in) throws IOException {
- shuffleJobToken = WritableUtils.readCompressedByteArray(in);
- }
-}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java Tue Dec 22 01:33:13 2009
@@ -22,16 +22,13 @@
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
-import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
-import javax.crypto.KeyGenerator;
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.SecretKey;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.record.Utils;
/**
@@ -43,62 +40,17 @@
public class SecureShuffleUtils {
public static final String HTTP_HEADER_URL_HASH = "UrlHash";
public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
- public static KeyGenerator kg = null;
- public static String DEFAULT_ALG="HmacSHA1";
-
- private SecretKeySpec secretKey;
- private Mac mac;
-
/**
- * static generate keys
- * @return new encoded key
- * @throws NoSuchAlgorithmException
+ * file name used on HDFS for generated job token
*/
- public static byte[] getNewEncodedKey() throws NoSuchAlgorithmException{
- SecretKeySpec key = generateKey(DEFAULT_ALG);
- return key.getEncoded();
- }
-
- private static SecretKeySpec generateKey(String alg) throws NoSuchAlgorithmException {
- if(kg==null) {
- kg = KeyGenerator.getInstance(alg);
- }
- return (SecretKeySpec) kg.generateKey();
- }
-
- /**
- * Create a util object with alg and key
- * @param sKeyEncoded
- * @throws NoSuchAlgorithmException
- * @throws InvalidKeyException
- */
- public SecureShuffleUtils(byte [] sKeyEncoded)
- throws IOException{
- secretKey = new SecretKeySpec(sKeyEncoded, SecureShuffleUtils.DEFAULT_ALG);
- try {
- mac = Mac.getInstance(DEFAULT_ALG);
- mac.init(secretKey);
- } catch (NoSuchAlgorithmException nae) {
- throw new IOException(nae);
- } catch( InvalidKeyException ie) {
- throw new IOException(ie);
- }
- }
-
- /**
- * get key as byte[]
- * @return encoded key
- */
- public byte [] getEncodedKey() {
- return secretKey.getEncoded();
- }
+ public static final String JOB_TOKEN_FILENAME = "jobToken";
/**
* Base64 encoded hash of msg
* @param msg
*/
- public String generateHash(byte[] msg) {
- return new String(Base64.encodeBase64(generateByteHash(msg)));
+ public static String generateHash(byte[] msg, SecretKey key) {
+ return new String(Base64.encodeBase64(generateByteHash(msg, key)));
}
/**
@@ -106,8 +58,8 @@
* @param msg
* @return
*/
- private byte[] generateByteHash(byte[] msg) {
- return mac.doFinal(msg);
+ private static byte[] generateByteHash(byte[] msg, SecretKey key) {
+ return JobTokenSecretManager.computeHash(msg, key);
}
/**
@@ -115,20 +67,21 @@
* @param newHash
* @return true if is the same
*/
- private boolean verifyHash(byte[] hash, byte[] msg) {
- byte[] msg_hash = generateByteHash(msg);
+ private static boolean verifyHash(byte[] hash, byte[] msg, SecretKey key) {
+ byte[] msg_hash = generateByteHash(msg, key);
return Utils.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length) == 0;
}
/**
* Aux util to calculate hash of a String
* @param enc_str
+ * @param key
* @return Base64 encodedHash
* @throws IOException
*/
- public String hashFromString(String enc_str)
+ public static String hashFromString(String enc_str, SecretKey key)
throws IOException {
- return generateHash(enc_str.getBytes());
+ return generateHash(enc_str.getBytes(), key);
}
/**
@@ -137,11 +90,11 @@
* @param msg
* @throws IOException if not the same
*/
- public void verifyReply(String base64Hash, String msg)
+ public static void verifyReply(String base64Hash, String msg, SecretKey key)
throws IOException {
byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
- boolean res = verifyHash(hash, msg.getBytes());
+ boolean res = verifyHash(hash, msg.getBytes(), key);
if(res != true) {
throw new IOException("Verification of the hashReply failed");
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java?rev=893055&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java Tue Dec 22 01:33:13 2009
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * The token identifier for job token
+ */
+@InterfaceAudience.Private
+public class JobTokenIdentifier extends TokenIdentifier {
+ private Text jobid;
+ final static Text KIND_NAME = new Text("mapreduce.job");
+
+ /**
+ * Create a job token identifier from a jobid
+ * @param jobid the jobid to use
+ */
+ public JobTokenIdentifier(Text jobid) {
+ this.jobid = jobid;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Text getKind() {
+ return KIND_NAME;
+ }
+
+ /**
+ * Get the jobid
+ * @return the jobid
+ */
+ public Text getJobId() {
+ return jobid;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ jobid.readFields(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ jobid.write(out);
+ }
+}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java?rev=893055&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java Tue Dec 22 01:33:13 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * SecretManager for job token. It can be used to cache generated job tokens.
+ */
+@InterfaceAudience.Private
+public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier> {
+ private final SecretKey masterKey;
+ private final Map<String, SecretKey> currentJobTokens;
+
+ /**
+ * Convert the byte[] to a secret key
+ * @param key the byte[] to create the secret key from
+ * @return the secret key
+ */
+ public static SecretKey createSecretKey(byte[] key) {
+ return SecretManager.createSecretKey(key);
+ }
+
+ /**
+ * Compute the HMAC hash of the message using the key
+ * @param msg the message to hash
+ * @param key the key to use
+ * @return the computed hash
+ */
+ public static byte[] computeHash(byte[] msg, SecretKey key) {
+ return createPassword(msg, key);
+ }
+
+ /**
+ * Default constructor
+ */
+ public JobTokenSecretManager() {
+ this.masterKey = generateSecret();
+ this.currentJobTokens = new TreeMap<String, SecretKey>();
+ }
+
+ /**
+ * Create a new password/secret for the given job token identifier.
+ * @param identifier the job token identifier
+ * @return token password/secret
+ */
+ @Override
+ public byte[] createPassword(JobTokenIdentifier identifier) {
+ byte[] result = createPassword(identifier.getBytes(), masterKey);
+ return result;
+ }
+
+ /**
+ * Add the job token of a job to cache
+ * @param jobId the job that owns the token
+ * @param token the job token
+ */
+ public void addTokenForJob(String jobId, Token<JobTokenIdentifier> token) {
+ SecretKey tokenSecret = createSecretKey(token.getPassword());
+ synchronized (currentJobTokens) {
+ currentJobTokens.put(jobId, tokenSecret);
+ }
+ }
+
+ /**
+ * Remove the cached job token of a job from cache
+ * @param jobId the job whose token is to be removed
+ */
+ public void removeTokenForJob(String jobId) {
+ synchronized (currentJobTokens) {
+ currentJobTokens.remove(jobId);
+ }
+ }
+
+ /**
+ * Look up the token password/secret for the given jobId.
+ * @param jobId the jobId to look up
+ * @return token password/secret as SecretKey
+ * @throws InvalidToken
+ */
+ public SecretKey retrieveTokenSecret(String jobId) throws InvalidToken {
+ SecretKey tokenSecret = null;
+ synchronized (currentJobTokens) {
+ tokenSecret = currentJobTokens.get(jobId);
+ }
+ if (tokenSecret == null) {
+ throw new InvalidToken("Can't find job token for job " + jobId + " !!");
+ }
+ return tokenSecret;
+ }
+
+ /**
+ * Look up the token password/secret for the given job token identifier.
+ * @param identifier the job token identifier to look up
+ * @return token password/secret as byte[]
+ * @throws InvalidToken
+ */
+ @Override
+ public byte[] retrievePassword(JobTokenIdentifier identifier)
+ throws InvalidToken {
+ return retrieveTokenSecret(identifier.getJobId().toString()).getEncoded();
+ }
+
+}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Tue Dec 22 01:33:13 2009
@@ -28,6 +28,8 @@
import java.util.List;
import java.util.Set;
+import javax.crypto.SecretKey;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
@@ -41,15 +43,11 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.security.JobTokens;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.commons.codec.binary.Base64;
-import java.security.GeneralSecurityException;
-
class Fetcher<K,V> extends Thread {
private static final Log LOG = LogFactory.getLog(Fetcher.class);
@@ -88,12 +86,12 @@
// Decompression of map-outputs
private final CompressionCodec codec;
private final Decompressor decompressor;
- private final byte[] shuffleJobToken;
+ private final SecretKey jobTokenSecret;
public Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
- ExceptionReporter exceptionReporter, byte [] shuffleJobToken) {
+ ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
this.reporter = reporter;
this.scheduler = scheduler;
this.merger = merger;
@@ -101,7 +99,7 @@
this.exceptionReporter = exceptionReporter;
this.id = ++nextId;
this.reduce = reduceId.getTaskID().getId();
- this.shuffleJobToken = shuffleJobToken;
+ this.jobTokenSecret = jobTokenSecret;
ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.IO_ERROR.toString());
wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
@@ -196,9 +194,8 @@
URLConnection connection = url.openConnection();
// generate hash of the url
- SecureShuffleUtils ssutil = new SecureShuffleUtils(shuffleJobToken);
String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
- String encHash = ssutil.hashFromString(msgToEncode);
+ String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
// put url hash into http header
connection.addRequestProperty(
@@ -215,7 +212,7 @@
}
LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
// verify that replyHash is HMac of encHash
- ssutil.verifyReply(replyHash, encHash);
+ SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
} catch (IOException ie) {
ioErrs.increment(1);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Tue Dec 22 01:33:13 2009
@@ -107,7 +107,7 @@
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
- reduceTask.getJobTokens().getShuffleJobToken());
+ reduceTask.getJobTokenSecret());
fetchers[i].start();
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java Tue Dec 22 01:33:13 2009
@@ -27,9 +27,14 @@
import java.net.URLConnection;
import java.security.GeneralSecurityException;
+import javax.crypto.SecretKey;
+
import org.apache.hadoop.http.HttpServer;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.token.Token;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -40,6 +45,7 @@
private static URL baseUrl;
private static File dir;
private static final String JOB_ID = "job_20091117075357176_0001";
+ private static final String BAD_JOB_ID = "job_20091117075357176_0002";
// create fake url
private URL getMapOutputURL(String host) throws MalformedURLException {
@@ -86,25 +92,26 @@
URL url = getMapOutputURL(baseUrl.toString());
String enc_str = SecureShuffleUtils.buildMsgFrom(url);
URLConnection connectionGood = url.openConnection();
-
- // create key
- byte [] key= SecureShuffleUtils.getNewEncodedKey();
- // create fake TaskTracker - needed for keys storage
- JobTokens jt = new JobTokens();
- jt.setShuffleJobToken(key);
TaskTracker tt = new TaskTracker();
+ JobTokenSecretManager jtSecretManager = new JobTokenSecretManager();
+ // create fake TaskTracker - needed for keys storage
+ JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(JOB_ID));
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(identifier,
+ jtSecretManager);
+ SecretKey tokenSecret = JobTokenSecretManager.createSecretKey(jt.getPassword());
addJobToken(tt, JOB_ID, jt); // fake id
server.setAttribute("task.tracker", tt);
// encode the url
- SecureShuffleUtils mac = new SecureShuffleUtils(key);
- String urlHashGood = mac.generateHash(enc_str.getBytes()); // valid hash
+ String urlHashGood = SecureShuffleUtils.generateHash(enc_str.getBytes(), tokenSecret); // valid hash
// another the key
- byte [] badKey= SecureShuffleUtils.getNewEncodedKey();
- mac = new SecureShuffleUtils(badKey);
- String urlHashBad = mac.generateHash(enc_str.getBytes()); // invalid hash
+ JobTokenIdentifier badIdentifier = new JobTokenIdentifier(new Text(BAD_JOB_ID));
+ Token<JobTokenIdentifier> badToken = new Token<JobTokenIdentifier>(badIdentifier,
+ jtSecretManager);
+ SecretKey badSecret = JobTokenSecretManager.createSecretKey(badToken.getPassword());
+ String urlHashBad = SecureShuffleUtils.generateHash(enc_str.getBytes(), badSecret); // invalid hash
// put url hash into http header
connectionGood.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, urlHashGood);
@@ -135,13 +142,13 @@
}
}
/*Note that this method is there for a unit testcase (TestShuffleJobToken)*/
- void addJobToken(TaskTracker tt, String jobIdStr, JobTokens jt) {
+ void addJobToken(TaskTracker tt, String jobIdStr, Token<JobTokenIdentifier> token) {
JobID jobId = JobID.forName(jobIdStr);
TaskTracker.RunningJob rJob = new TaskTracker.RunningJob(jobId);
- rJob.jobTokens = jt;
synchronized (tt.runningJobs) {
tt.runningJobs.put(jobId, rJob);
}
+ tt.getJobTokenSecretManager().addTokenForJob(jobIdStr, token);
}
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=893055&r1=893054&r2=893055&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Tue Dec 22 01:33:13 2009
@@ -37,9 +37,11 @@
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
@@ -255,10 +257,10 @@
if(!dir.exists())
assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs());
- File jobTokenFile = new File(dir, JobTokens.JOB_TOKEN_FILENAME);
+ File jobTokenFile = new File(dir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
FileOutputStream fos = new FileOutputStream(jobTokenFile);
java.io.DataOutputStream out = new java.io.DataOutputStream(fos);
- JobTokens jt = new JobTokens();
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
jt.write(out); // writing empty file, we don't the keys for this test
out.close();
}