You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/11/21 00:43:46 UTC
svn commit: r882790 - in /hadoop/mapreduce/trunk: ./
src/contrib/capacity-scheduler/ src/contrib/streaming/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/
src/java/org/apache/hadoop/mapreduce/security/ src/java/org/apache/hado...
Author: ddas
Date: Fri Nov 20 23:43:45 2009
New Revision: 882790
URL: http://svn.apache.org/viewvc?rev=882790&view=rev
Log:
MAPREDUCE-1026. Does mutual authentication of the shuffle transfers using a shared JobTracker generated key. Contributed by Boris Shkolnik.
Added:
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/ivy.xml
hadoop/mapreduce/trunk/src/contrib/streaming/ivy.xml
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Nov 20 23:43:45 2009
@@ -9,6 +9,10 @@
MAPREDUCE-1017. Compression and output splitting for Sqoop.
(Aaron Kimball via tomwhite)
+ MAPREDUCE-1026. Does mutual authentication of the shuffle
+ transfers using a shared JobTracker generated key.
+ (Boris Shkolnik via ddas)
+
IMPROVEMENTS
MAPREDUCE-1198. Alternatively schedule different types of tasks in
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/ivy.xml?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/ivy.xml Fri Nov 20 23:43:45 2009
@@ -80,5 +80,9 @@
name="paranamer"
rev="${paranamer.version}"
conf="common->default"/>
+ <dependency org="commons-codec"
+ name="commons-codec"
+ rev="${commons-codec.version}"
+ conf="common->default"/>
</dependencies>
</ivy-module>
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/ivy.xml?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/ivy.xml Fri Nov 20 23:43:45 2009
@@ -80,5 +80,9 @@
name="paranamer"
rev="${paranamer.version}"
conf="common->default"/>
+ <dependency org="commons-codec"
+ name="commons-codec"
+ rev="${commons-codec.version}"
+ conf="common->default"/>
</dependencies>
</ivy-module>
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Fri Nov 20 23:43:45 2009
@@ -26,11 +26,15 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.JobTokens;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
@@ -65,6 +69,13 @@
int jvmIdInt = Integer.parseInt(args[3]);
JVMId jvmId = new JVMId(firstTaskid.getJobID(),
firstTaskid.getTaskType() == TaskType.MAP,jvmIdInt);
+
+ // file name is passed thru env
+ String jobTokenFile = System.getenv().get("JOB_TOKEN_FILE");
+ FileSystem localFs = FileSystem.getLocal(defaultConf);
+ JobTokens jt = loadJobTokens(jobTokenFile, localFs);
+ LOG.debug("Child: got jobTokenfile=" + jobTokenFile);
+
TaskUmbilicalProtocol umbilical =
(TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
TaskUmbilicalProtocol.versionID,
@@ -141,7 +152,10 @@
//are viewable immediately
TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
JobConf job = new JobConf(task.getJobFile());
-
+
+ // set the jobTokenFile into task
+ task.setJobTokens(jt);
+
// setup the child's Configs.LOCAL_DIR. The child is now sandboxed and
// can only see files down and under attemtdir only.
TaskRunner.setupChildMapredLocalDirs(task, job);
@@ -210,4 +224,22 @@
LogManager.shutdown();
}
}
+
+ /**
+ * load secret keys from a file
+ * @param jobTokenFile
+ * @param conf
+ * @throws IOException
+ */
+ private static JobTokens loadJobTokens(String jobTokenFile, FileSystem localFS)
+ throws IOException {
+ Path localJobTokenFile = new Path (jobTokenFile);
+ FSDataInputStream in = localFS.open(localJobTokenFile);
+ JobTokens jt = new JobTokens();
+ jt.readFields(in);
+
+ LOG.debug("Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath());
+ in.close();
+ return jt;
+ }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Nov 20 23:43:45 2009
@@ -63,6 +63,8 @@
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
@@ -71,6 +73,7 @@
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
/*************************************************************
* JobInProgress maintains all the info for keeping
@@ -576,6 +579,11 @@
setPriority(this.priority);
//
+ // generate security keys needed by Tasks
+ //
+ generateJobTokens(jobtracker.getSystemDirectoryForJob(jobId));
+
+ //
// read input splits and create a map per a split
//
String jobFile = profile.getJobFile();
@@ -3505,4 +3513,30 @@
LOG.debug("Failed to delete file " + f);
}
}
+
+ /**
+ * generate keys and save it into the file
+ * @param jobDir
+ * @throws IOException
+ */
+ private void generateJobTokens(Path jobDir) throws IOException{
+ Path keysFile = new Path(jobDir, JobTokens.JOB_TOKEN_FILENAME);
+ FSDataOutputStream os = fs.create(keysFile);
+ //create JobTokens file and add key to it
+ JobTokens jt = new JobTokens();
+ byte [] key;
+ try {
+ // new key
+ key = SecureShuffleUtils.getNewEncodedKey();
+ } catch (java.security.GeneralSecurityException e) {
+ throw new IOException(e);
+ }
+ // remember the key
+ jt.setShuffleJobToken(key);
+ // other keys..
+ jt.write(os);
+ os.close();
+ LOG.debug("jobTokens generated and stored in "+ keysFile.toUri().getPath());
+ }
+
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri Nov 20 23:43:45 2009
@@ -21,8 +21,6 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Iterator;
@@ -51,12 +49,15 @@
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
/**
* Base class for tasks.
@@ -159,6 +160,7 @@
protected final Counters.Counter mergedMapOutputsCounter;
private int numSlotsRequired;
protected TaskUmbilicalProtocol umbilical;
+ protected JobTokens jobTokens=null; // storage of the secret keys
////////////////////////////////////////////
// Constructors
@@ -211,6 +213,23 @@
public JobID getJobID() {
return taskId.getJobID();
}
+
+ /**
+ * set JobToken storage
+ * @param jt
+ */
+ public void setJobTokens(JobTokens jt) {
+ this.jobTokens = jt;
+ }
+
+ /**
+ * get JobToken storage
+ * @return storage object
+ */
+ public JobTokens getJobTokens() {
+ return this.jobTokens;
+ }
+
/**
* Get the index of this task within the job.
@@ -1299,7 +1318,6 @@
reporter, comparator, keyClass,
valueClass);
reducer.run(reducerContext);
- }
-
+ }
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri Nov 20 23:43:45 2009
@@ -32,6 +32,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
@@ -505,6 +506,11 @@
}
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+ // put jobTokenFile name into env
+ String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
+ LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
+ env.put("JOB_TOKEN_FILE", jobTokenFile);
+
// for the child of task jvm, set hadoop.root.logger
env.put("HADOOP_ROOT_LOGGER", "INFO,TLA");
String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Nov 20 23:43:45 2009
@@ -73,6 +73,8 @@
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@@ -186,7 +188,7 @@
* Map from taskId -> TaskInProgress.
*/
Map<TaskAttemptID, TaskInProgress> runningTasks = null;
- Map<JobID, RunningJob> runningJobs = null;
+ Map<JobID, RunningJob> runningJobs = new TreeMap<JobID, RunningJob>();
volatile int mapTotal = 0;
volatile int reduceTotal = 0;
@@ -215,6 +217,7 @@
private static final String JARSDIR = "jars";
static final String LOCAL_SPLIT_FILE = "split.dta";
static final String JOBFILE = "job.xml";
+ static final String JOB_TOKEN_FILE="jobToken"; //localized file
static final String JOB_LOCAL_DIR = JobContext.JOB_LOCAL_DIR;
@@ -443,6 +446,11 @@
static String getLocalJobConfFile(String user, String jobid) {
return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
}
+
+ static String getLocalJobTokenFile(String user, String jobid) {
+ return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
+ }
+
static String getTaskConfFile(String user, String jobid, String taskid,
boolean isCleanupAttempt) {
@@ -873,6 +881,12 @@
rjob.jobConf = localJobConf;
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
localJobConf.getKeepFailedTaskFiles());
+ FSDataInputStream in = localFs.open(new Path(
+ rjob.jobConf.get(JobContext.JOB_TOKEN_FILE)));
+ JobTokens jt = new JobTokens();
+ jt.readFields(in);
+ rjob.jobTokens = jt; // store JobToken object per job
+
rjob.localized = true;
}
}
@@ -924,6 +938,8 @@
// Download the job.jar for this job from the system FS
localizeJobJarFile(userName, jobId, localFs, localJobConf);
+ // save local copy of JobToken file
+ localizeJobTokenFile(userName, jobId, localJobConf);
return localJobConf;
}
@@ -2879,6 +2895,7 @@
boolean localized;
boolean keepJobFiles;
FetchStatus f;
+ JobTokens jobTokens;
RunningJob(JobID jobid) {
this.jobid = jobid;
localized = false;
@@ -3067,6 +3084,8 @@
TaskTracker tracker =
(TaskTracker) context.getAttribute("task.tracker");
+ verifyRequest(request, response, tracker, jobId);
+
int numMaps = 0;
try {
shuffleMetrics.serverHandlerBusy();
@@ -3230,7 +3249,58 @@
" from map: " + mapId + " given " + info.partLength + "/" +
info.rawLength);
}
+
+ /**
+ * verify that request has correct HASH for the url
+ * and also add a field to reply header with hash of the HASH
+ * @param request
+ * @param response
+ * @param jt the job token
+ * @throws IOException
+ */
+ private void verifyRequest(HttpServletRequest request,
+ HttpServletResponse response, TaskTracker tracker, String jobId)
+ throws IOException {
+ JobTokens jt = null;
+ synchronized (tracker.runningJobs) {
+ RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
+ if (rjob == null) {
+ throw new IOException("Unknown job " + jobId + "!!");
+ }
+ jt = rjob.jobTokens;
+ }
+ // string to encrypt
+ String enc_str = SecureShuffleUtils.buildMsgFrom(request);
+
+ // hash from the fetcher
+ String urlHashStr = request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+ if(urlHashStr == null) {
+ response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+ throw new IOException("fetcher cannot be authenticated");
+ }
+ int len = urlHashStr.length();
+ LOG.debug("verifying request. enc_str="+enc_str+"; hash=..."+
+ urlHashStr.substring(len-len/2, len-1)); // half of the hash for debug
+
+ SecureShuffleUtils ssutil = new SecureShuffleUtils(jt.getShuffleJobToken());
+ // verify - throws exception
+ try {
+ ssutil.verifyReply(urlHashStr, enc_str);
+ } catch (IOException ioe) {
+ response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+ throw ioe;
+ }
+
+ // verification passed - encode the reply
+ String reply = ssutil.generateHash(urlHashStr.getBytes());
+ response.addHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+
+ len = reply.length();
+ LOG.debug("Fetcher request verfied. enc_str="+enc_str+";reply="
+ +reply.substring(len-len/2, len-1));
+ }
}
+
// get the full paths of the directory in all the local disks.
private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
@@ -3444,4 +3514,37 @@
TrackerDistributedCacheManager getTrackerDistributedCacheManager() {
return distributedCacheManager;
}
+
+ /**
+ * Download the job-token file from the FS and save on local fs.
+ * @param user
+ * @param jobId
+ * @param jobConf
+ * @return the local file system path of the downloaded file.
+ * @throws IOException
+ */
+ private void localizeJobTokenFile(String user, JobID jobId, JobConf jobConf)
+ throws IOException {
+ // check if the tokenJob file is there..
+ Path skPath = new Path(systemDirectory,
+ jobId.toString()+"/"+JobTokens.JOB_TOKEN_FILENAME);
+
+ FileStatus status = null;
+ long jobTokenSize = -1;
+ status = systemFS.getFileStatus(skPath); //throws FileNotFoundException
+ jobTokenSize = status.getLen();
+
+ Path localJobTokenFile =
+ lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user,
+ jobId.toString()), jobTokenSize, fConf);
+
+ LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() +
+ " to " + localJobTokenFile.toUri().getPath());
+
+ // Download job_token
+ systemFS.copyToLocalFile(skPath, localJobTokenFile);
+ // set it into jobConf to transfer the name to TaskRunner
+ jobConf.set(JobContext.JOB_TOKEN_FILE,localJobTokenFile.toString());
+ }
+
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Fri Nov 20 23:43:45 2009
@@ -220,6 +220,7 @@
"mapreduce.reduce.merge.memtomem.threshold";
public static final String REDUCE_MEMTOMEM_ENABLED =
"mapreduce.reduce.merge.memtomem.enabled";
+ public static final String JOB_TOKEN_FILE = "mapreduce.job.jobTokenFile";
/**
* Return the configuration for the job.
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java?rev=882790&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java Fri Nov 20 23:43:45 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * get/set, store/load security keys
+ * key's value - byte[]
+ * store/load from DataInput/DataOuptut
+ * List of currently store keys:
+ * jobToken for secure shuffle HTTP Get
+ *
+ */
+@InterfaceAudience.Private
+public class JobTokens implements Writable {
+ /**
+ * file name used on HDFS for generated keys
+ */
+ public static final String JOB_TOKEN_FILENAME = "jobTokens";
+
+ private byte [] shuffleJobToken = null; // jobtoken for shuffle (map output)
+
+
+ /**
+ * returns the key value for the alias
+ * @return key for this alias
+ */
+ public byte[] getShuffleJobToken() {
+ return shuffleJobToken;
+ }
+
+ /**
+ * sets the jobToken
+ * @param key
+ */
+ public void setShuffleJobToken(byte[] key) {
+ shuffleJobToken = key;
+ }
+
+ /**
+ * stores all the keys to DataOutput
+ * @param out
+ * @throws IOException
+ */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeCompressedByteArray(out, shuffleJobToken);
+ }
+
+ /**
+ * loads all the keys
+ * @param in
+ * @throws IOException
+ */
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ shuffleJobToken = WritableUtils.readCompressedByteArray(in);
+ }
+}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java?rev=882790&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java Fri Nov 20 23:43:45 2009
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.security;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URL;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.record.Utils;
+
+/**
+ *
+ * utilities for generating kyes, hashes and verifying them for shuffle
+ *
+ */
+@InterfaceAudience.Private
+public class SecureShuffleUtils {
+ public static final String HTTP_HEADER_URL_HASH = "UrlHash";
+ public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
+ public static KeyGenerator kg = null;
+ public static String DEFAULT_ALG="HmacSHA1";
+
+ private SecretKeySpec secretKey;
+ private Mac mac;
+
+ /**
+ * static generate keys
+ * @return new encoded key
+ * @throws NoSuchAlgorithmException
+ */
+ public static byte[] getNewEncodedKey() throws NoSuchAlgorithmException{
+ SecretKeySpec key = generateKey(DEFAULT_ALG);
+ return key.getEncoded();
+ }
+
+ private static SecretKeySpec generateKey(String alg) throws NoSuchAlgorithmException {
+ if(kg==null) {
+ kg = KeyGenerator.getInstance(alg);
+ }
+ return (SecretKeySpec) kg.generateKey();
+ }
+
+ /**
+ * Create a util object with alg and key
+ * @param sKeyEncoded
+ * @throws NoSuchAlgorithmException
+ * @throws InvalidKeyException
+ */
+ public SecureShuffleUtils(byte [] sKeyEncoded)
+ throws IOException{
+ secretKey = new SecretKeySpec(sKeyEncoded, SecureShuffleUtils.DEFAULT_ALG);
+ try {
+ mac = Mac.getInstance(DEFAULT_ALG);
+ mac.init(secretKey);
+ } catch (NoSuchAlgorithmException nae) {
+ throw new IOException(nae);
+ } catch( InvalidKeyException ie) {
+ throw new IOException(ie);
+ }
+ }
+
+ /**
+ * get key as byte[]
+ * @return encoded key
+ */
+ public byte [] getEncodedKey() {
+ return secretKey.getEncoded();
+ }
+
+ /**
+ * Base64 encoded hash of msg
+ * @param msg
+ */
+ public String generateHash(byte[] msg) {
+ return new String(Base64.encodeBase64(generateByteHash(msg)));
+ }
+
+ /**
+ * calculate hash of msg
+ * @param msg
+ * @return
+ */
+ private byte[] generateByteHash(byte[] msg) {
+ return mac.doFinal(msg);
+ }
+
+ /**
+ * verify that hash equals to HMacHash(msg)
+ * @param newHash
+ * @return true if is the same
+ */
+ private boolean verifyHash(byte[] hash, byte[] msg) {
+ byte[] msg_hash = generateByteHash(msg);
+ return Utils.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length) == 0;
+ }
+
+ /**
+ * Aux util to calculate hash of a String
+ * @param enc_str
+ * @return Base64 encodedHash
+ * @throws IOException
+ */
+ public String hashFromString(String enc_str)
+ throws IOException {
+ return generateHash(enc_str.getBytes());
+ }
+
+ /**
+ * verify that base64Hash is same as HMacHash(msg)
+ * @param base64Hash (Base64 encoded hash)
+ * @param msg
+ * @throws IOException if not the same
+ */
+ public void verifyReply(String base64Hash, String msg)
+ throws IOException {
+ byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
+
+ boolean res = verifyHash(hash, msg.getBytes());
+
+ if(res != true) {
+ throw new IOException("Verification of the hashReply failed");
+ }
+ }
+
+ /**
+ * Shuffle specific utils - build string for encoding from URL
+ * @param url
+ * @return string for encoding
+ */
+ public static String buildMsgFrom(URL url) {
+ return buildMsgFrom(url.getPath(), url.getQuery(), url.getPort());
+ }
+ /**
+ * Shuffle specific utils - build string for encoding from URL
+ * @param request
+ * @return string for encoding
+ */
+ public static String buildMsgFrom(HttpServletRequest request ) {
+ return buildMsgFrom(request.getRequestURI(), request.getQueryString(),
+ request.getLocalPort());
+ }
+ /**
+ * Shuffle specific utils - build string for encoding from URL
+ * @param uri_path
+ * @param uri_query
+ * @return string for encoding
+ */
+ private static String buildMsgFrom(String uri_path, String uri_query, int port) {
+ return String.valueOf(port) + uri_path + "?" + uri_query;
+ }
+
+
+ /**
+ * byte array to Hex String
+ * @param ba
+ * @return string with HEX value of the key
+ */
+ public static String toHex(byte[] ba) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ for(byte b: ba) {
+ ps.printf("%x", b);
+ }
+ return baos.toString();
+ }
+}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Fri Nov 20 23:43:45 2009
@@ -41,10 +41,15 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.commons.codec.binary.Base64;
+import java.security.GeneralSecurityException;
+
class Fetcher<K,V> extends Thread {
private static final Log LOG = LogFactory.getLog(Fetcher.class);
@@ -83,11 +88,12 @@
// Decompression of map-outputs
private final CompressionCodec codec;
private final Decompressor decompressor;
+ private final byte[] shuffleJobToken;
public Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
- ExceptionReporter exceptionReporter) {
+ ExceptionReporter exceptionReporter, byte [] shuffleJobToken) {
this.reporter = reporter;
this.scheduler = scheduler;
this.merger = merger;
@@ -95,6 +101,7 @@
this.exceptionReporter = exceptionReporter;
this.id = ++nextId;
this.reduce = reduceId.getTaskID().getId();
+ this.shuffleJobToken = shuffleJobToken;
ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.IO_ERROR.toString());
wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
@@ -185,12 +192,31 @@
boolean connectSucceeded = false;
try {
- URLConnection connection = getMapOutputURL(host, maps).openConnection();
+ URL url = getMapOutputURL(host, maps);
+ URLConnection connection = url.openConnection();
+
+ // generate hash of the url
+ SecureShuffleUtils ssutil = new SecureShuffleUtils(shuffleJobToken);
+ String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
+ String encHash = ssutil.hashFromString(msgToEncode);
+
+ // put url hash into http header
+ connection.addRequestProperty(
+ SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
connectSucceeded = true;
input =
new DataInputStream(getInputStream(connection, connectionTimeout,
readTimeout));
-
+
+ // get the replyHash which is HMac of the encHash we sent to the server
+ String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
+ if(replyHash==null) {
+ throw new IOException("security validation of TT Map output failed");
+ }
+ LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
+ // verify that replyHash is HMac of encHash
+ ssutil.verifyReply(replyHash, encHash);
+ LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
} catch (IOException ie) {
ioErrs.increment(1);
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Fri Nov 20 23:43:45 2009
@@ -93,7 +93,7 @@
mergedMapOutputsCounter,
this, mergePhase);
}
-
+
@SuppressWarnings("unchecked")
public RawKeyValueIterator run() throws IOException, InterruptedException {
// Start the map-completion events fetcher thread
@@ -106,7 +106,8 @@
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
- reporter, metrics, this);
+ reporter, metrics, this,
+ reduceTask.getJobTokens().getShuffleJobToken());
fetchers[i].start();
}
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java?rev=882790&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java Fri Nov 20 23:43:45 2009
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+
+import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
+public class TestShuffleJobToken {
+ private static HttpServer server;
+ private static URL baseUrl;
+ private static File dir;
+ private static final String JOB_ID = "job_20091117075357176_0001";
+
+ // create fake url
+ private URL getMapOutputURL(String host) throws MalformedURLException {
+ // Get the base url
+ StringBuffer url = new StringBuffer(host);
+ url.append("mapOutput?");
+ url.append("job=" + JOB_ID + "&");
+ url.append("reduce=0&");
+ url.append("map=attempt");
+
+ return new URL(url.toString());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ dir = new File(System.getProperty("build.webapps", "build/webapps") + "/test");
+ System.out.println("dir="+dir.getAbsolutePath());
+ if(!dir.exists()) {
+ assertTrue(dir.mkdirs());
+ }
+ server = new HttpServer("test", "0.0.0.0", 0, true);
+ server.addServlet("shuffle", "/mapOutput", TaskTracker.MapOutputServlet.class);
+ server.start();
+ int port = server.getPort();
+ baseUrl = new URL("http://localhost:" + port + "/");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if(dir.exists())
+ dir.delete();
+ if(server!=null)
+ server.stop();
+ }
+
+
+ /**
+ * try positive and negative case with invalid urlHash
+ */
+ @Test
+ public void testInvalidJobToken()
+ throws IOException, GeneralSecurityException {
+
+ URL url = getMapOutputURL(baseUrl.toString());
+ String enc_str = SecureShuffleUtils.buildMsgFrom(url);
+ URLConnection connectionGood = url.openConnection();
+
+ // create key
+ byte [] key= SecureShuffleUtils.getNewEncodedKey();
+
+ // create fake TaskTracker - needed for keys storage
+ JobTokens jt = new JobTokens();
+ jt.setShuffleJobToken(key);
+ TaskTracker tt = new TaskTracker();
+ addJobToken(tt, JOB_ID, jt); // fake id
+ server.setAttribute("task.tracker", tt);
+
+ // encode the url
+ SecureShuffleUtils mac = new SecureShuffleUtils(key);
+ String urlHashGood = mac.generateHash(enc_str.getBytes()); // valid hash
+
+ // another the key
+ byte [] badKey= SecureShuffleUtils.getNewEncodedKey();
+ mac = new SecureShuffleUtils(badKey);
+ String urlHashBad = mac.generateHash(enc_str.getBytes()); // invalid hash
+
+ // put url hash into http header
+ connectionGood.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, urlHashGood);
+
+ // valid url hash should not fail with security error
+ try {
+ connectionGood.getInputStream();
+ } catch (IOException ie) {
+ String msg = ie.getLocalizedMessage();
+ if(msg.contains("Server returned HTTP response code: 401 for URL:")) {
+ fail("securtity failure with valid urlHash:"+ie);
+ }
+ System.out.println("valid urlhash passed validation");
+ }
+ // invalid url hash
+ URLConnection connectionBad = url.openConnection();
+ connectionBad.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, urlHashBad);
+
+ try {
+ connectionBad.getInputStream();
+ fail("Connection should've failed because of invalid urlHash");
+ } catch (IOException ie) {
+ String msg = ie.getLocalizedMessage();
+ if(!msg.contains("Server returned HTTP response code: 401 for URL:")) {
+ fail("connection failed with other then validation error:"+ie);
+ }
+ System.out.println("validation worked, failed with:"+ie);
+ }
+ }
+ /*Note that this method is there for a unit testcase (TestShuffleJobToken)*/
+ void addJobToken(TaskTracker tt, String jobIdStr, JobTokens jt) {
+ JobID jobId = JobID.forName(jobIdStr);
+ TaskTracker.RunningJob rJob = new TaskTracker.RunningJob(jobId);
+ rJob.jobTokens = jt;
+ synchronized (tt.runningJobs) {
+ tt.runningJobs.put(jobId, rJob);
+ }
+ }
+
+}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Nov 20 23:43:45 2009
@@ -36,6 +36,7 @@
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.JobTokens;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
@@ -140,14 +141,15 @@
// JobClient uploads the jobConf to the file system.
File jobConfFile = uploadJobConf(job.getConfiguration());
-
- // Set up the TaskTracker
+
+ // Set up the TaskTracker
tracker = new TaskTracker();
tracker.setConf(trackerFConf);
// for test case system FS is the local FS
tracker.localFs = tracker.systemFS = FileSystem.getLocal(trackerFConf);
-
+ tracker.systemDirectory = new Path(TEST_ROOT_DIR.getAbsolutePath());
+
taskTrackerUGI = UserGroupInformation.login(trackerFConf);
// Set up the task to be localized
@@ -159,6 +161,10 @@
new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, null, 1);
task.setConf(job.getConfiguration()); // Set conf. Set user name in particular.
+ // create jobTokens file
+ uploadJobTokensFile();
+
+
taskController = new DefaultTaskController();
taskController.setConf(trackerFConf);
taskController.setup();
@@ -204,6 +210,25 @@
out.close();
return jobConfFile;
}
+
+ /**
+ * create fake JobTokens file
+ * @return
+ * @throws IOException
+ */
+ protected void uploadJobTokensFile() throws IOException {
+
+ File dir = new File(TEST_ROOT_DIR, jobId.toString());
+ if(!dir.exists())
+ assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs());
+
+ File jobTokenFile = new File(dir, JobTokens.JOB_TOKEN_FILENAME);
+ FileOutputStream fos = new FileOutputStream(jobTokenFile);
+ java.io.DataOutputStream out = new java.io.DataOutputStream(fos);
+ JobTokens jt = new JobTokens();
+ jt.write(out); // writing empty file, we don't the keys for this test
+ out.close();
+ }
@Override
protected void tearDown()