You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/11/21 00:43:46 UTC

svn commit: r882790 - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/ src/contrib/streaming/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/security/ src/java/org/apache/hado...

Author: ddas
Date: Fri Nov 20 23:43:45 2009
New Revision: 882790

URL: http://svn.apache.org/viewvc?rev=882790&view=rev
Log:
MAPREDUCE-1026. Does mutual authentication of the shuffle transfers using a shared JobTracker generated key. Contributed by Boris Shkolnik.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/ivy.xml
    hadoop/mapreduce/trunk/src/contrib/streaming/ivy.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Nov 20 23:43:45 2009
@@ -9,6 +9,10 @@
     MAPREDUCE-1017. Compression and output splitting for Sqoop.
     (Aaron Kimball via tomwhite)
 
+    MAPREDUCE-1026. Does mutual authentication of the shuffle
+    transfers using a shared JobTracker generated key.
+    (Boris Shkolnik via ddas)
+
   IMPROVEMENTS
 
     MAPREDUCE-1198. Alternatively schedule different types of tasks in

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/ivy.xml?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/ivy.xml Fri Nov 20 23:43:45 2009
@@ -80,5 +80,9 @@
       name="paranamer"
       rev="${paranamer.version}"
       conf="common->default"/>
+	<dependency org="commons-codec"
+      name="commons-codec"
+      rev="${commons-codec.version}"
+      conf="common->default"/>
   </dependencies>
 </ivy-module>

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/ivy.xml?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/ivy.xml Fri Nov 20 23:43:45 2009
@@ -80,5 +80,9 @@
       name="paranamer"
       rev="${paranamer.version}"
       conf="common->default"/>
+	<dependency org="commons-codec"
+      name="commons-codec"
+      rev="${commons-codec.version}"
+      conf="common->default"/>
     </dependencies>
 </ivy-module>

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Fri Nov 20 23:43:45 2009
@@ -26,11 +26,15 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.JobTokens;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
@@ -65,6 +69,13 @@
     int jvmIdInt = Integer.parseInt(args[3]);
     JVMId jvmId = new JVMId(firstTaskid.getJobID(),
         firstTaskid.getTaskType() == TaskType.MAP,jvmIdInt);
+    
+    // file name is passed thru env
+    String jobTokenFile = System.getenv().get("JOB_TOKEN_FILE");
+    FileSystem localFs = FileSystem.getLocal(defaultConf);
+    JobTokens jt = loadJobTokens(jobTokenFile, localFs);
+    LOG.debug("Child: got jobTokenfile=" + jobTokenFile);
+    
     TaskUmbilicalProtocol umbilical =
       (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
           TaskUmbilicalProtocol.versionID,
@@ -141,7 +152,10 @@
         //are viewable immediately
         TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
         JobConf job = new JobConf(task.getJobFile());
-
+        
+        // set the jobTokenFile into task
+        task.setJobTokens(jt);
+        
         // setup the child's Configs.LOCAL_DIR. The child is now sandboxed and
         // can only see files down and under attemtdir only.
         TaskRunner.setupChildMapredLocalDirs(task, job);
@@ -210,4 +224,22 @@
       LogManager.shutdown();
     }
   }
+  
+  /**
+   * load secret keys from a file
+   * @param jobTokenFile
+   * @param conf
+   * @throws IOException
+   */
+  private static JobTokens loadJobTokens(String jobTokenFile, FileSystem localFS) 
+  throws IOException {
+    Path localJobTokenFile = new Path (jobTokenFile);
+    FSDataInputStream in = localFS.open(localJobTokenFile);
+    JobTokens jt = new JobTokens();
+    jt.readFields(in);
+        
+    LOG.debug("Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath());
+    in.close();
+    return jt;
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Nov 20 23:43:45 2009
@@ -63,6 +63,8 @@
 import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -71,6 +73,7 @@
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
@@ -576,6 +579,11 @@
     setPriority(this.priority);
     
     //
+    // generate security keys needed by Tasks
+    //
+    generateJobTokens(jobtracker.getSystemDirectoryForJob(jobId));
+    
+    //
     // read input splits and create a map per a split
     //
     String jobFile = profile.getJobFile();
@@ -3505,4 +3513,30 @@
       LOG.debug("Failed to delete file " + f);
     }
   }
+  
+  /**
+   * generate keys and save it into the file
+   * @param jobDir
+   * @throws IOException
+   */
+  private void generateJobTokens(Path jobDir) throws IOException{
+    Path keysFile = new Path(jobDir, JobTokens.JOB_TOKEN_FILENAME);
+    FSDataOutputStream os = fs.create(keysFile);
+    //create JobTokens file and add key to it
+    JobTokens jt = new JobTokens();
+    byte [] key;
+    try {
+      // new key
+      key = SecureShuffleUtils.getNewEncodedKey();
+    } catch (java.security.GeneralSecurityException e) {
+      throw new IOException(e);
+    }
+    // remember the key 
+    jt.setShuffleJobToken(key);
+    // other keys..
+    jt.write(os);
+    os.close();
+    LOG.debug("jobTokens generated and stored in "+ keysFile.toUri().getPath());
+  }
+
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri Nov 20 23:43:45 2009
@@ -21,8 +21,6 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.text.NumberFormat;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -51,12 +49,15 @@
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
 import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
 
 /** 
  * Base class for tasks.
@@ -159,6 +160,7 @@
   protected final Counters.Counter mergedMapOutputsCounter;
   private int numSlotsRequired;
   protected TaskUmbilicalProtocol umbilical;
+  protected JobTokens jobTokens=null; // storage of the secret keys
 
   ////////////////////////////////////////////
   // Constructors
@@ -211,6 +213,23 @@
   public JobID getJobID() {
     return taskId.getJobID();
   }
+
+  /**
+   * set JobToken storage 
+   * @param jt
+   */
+  public void setJobTokens(JobTokens jt) {
+    this.jobTokens = jt;
+  }
+
+  /**
+   * get JobToken storage
+   * @return storage object
+   */
+  public JobTokens getJobTokens() {
+    return this.jobTokens;
+  }
+
   
   /**
    * Get the index of this task within the job.
@@ -1299,7 +1318,6 @@
                                                 reporter, comparator, keyClass,
                                                 valueClass);
       reducer.run(reducerContext);
-    }
-    
+    } 
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri Nov 20 23:43:45 2009
@@ -32,6 +32,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
@@ -505,6 +506,11 @@
     }
     env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
     
+    // put jobTokenFile name into env
+    String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
+    LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
+    env.put("JOB_TOKEN_FILE", jobTokenFile);
+    
     // for the child of task jvm, set hadoop.root.logger
     env.put("HADOOP_ROOT_LOGGER", "INFO,TLA");
     String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Nov 20 23:43:45 2009
@@ -73,6 +73,8 @@
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@@ -186,7 +188,7 @@
    * Map from taskId -> TaskInProgress.
    */
   Map<TaskAttemptID, TaskInProgress> runningTasks = null;
-  Map<JobID, RunningJob> runningJobs = null;
+  Map<JobID, RunningJob> runningJobs = new TreeMap<JobID, RunningJob>();
 
   volatile int mapTotal = 0;
   volatile int reduceTotal = 0;
@@ -215,6 +217,7 @@
   private static final String JARSDIR = "jars";
   static final String LOCAL_SPLIT_FILE = "split.dta";
   static final String JOBFILE = "job.xml";
+  static final String JOB_TOKEN_FILE="jobToken"; //localized file
 
   static final String JOB_LOCAL_DIR = JobContext.JOB_LOCAL_DIR;
 
@@ -443,6 +446,11 @@
   static String getLocalJobConfFile(String user, String jobid) {
     return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
   }
+  
+  static String getLocalJobTokenFile(String user, String jobid) {
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
+  }
+
 
   static String getTaskConfFile(String user, String jobid, String taskid,
       boolean isCleanupAttempt) {
@@ -873,6 +881,12 @@
         rjob.jobConf = localJobConf;
         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                              localJobConf.getKeepFailedTaskFiles());
+        FSDataInputStream in = localFs.open(new Path(
+            rjob.jobConf.get(JobContext.JOB_TOKEN_FILE)));
+        JobTokens jt = new JobTokens();
+        jt.readFields(in); 
+        rjob.jobTokens = jt; // store JobToken object per job
+        
         rjob.localized = true;
       }
     }
@@ -924,6 +938,8 @@
 
     // Download the job.jar for this job from the system FS
     localizeJobJarFile(userName, jobId, localFs, localJobConf);
+    // save local copy of JobToken file
+    localizeJobTokenFile(userName, jobId, localJobConf);
     return localJobConf;
   }
 
@@ -2879,6 +2895,7 @@
     boolean localized;
     boolean keepJobFiles;
     FetchStatus f;
+    JobTokens jobTokens;
     RunningJob(JobID jobid) {
       this.jobid = jobid;
       localized = false;
@@ -3067,6 +3084,8 @@
       TaskTracker tracker = 
         (TaskTracker) context.getAttribute("task.tracker");
 
+      verifyRequest(request, response, tracker, jobId);
+      
       int numMaps = 0;
       try {
         shuffleMetrics.serverHandlerBusy();
@@ -3230,7 +3249,58 @@
           " from map: " + mapId + " given " + info.partLength + "/" + 
           info.rawLength);
     }
+    
+    /**
+     * verify that request has correct HASH for the url
+     * and also add a field to reply header with hash of the HASH
+     * @param request
+     * @param response
+     * @param jt the job token
+     * @throws IOException
+     */
+    private void verifyRequest(HttpServletRequest request, 
+        HttpServletResponse response, TaskTracker tracker, String jobId) 
+    throws IOException {
+      JobTokens jt = null;
+      synchronized (tracker.runningJobs) {
+        RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
+        if (rjob == null) {
+          throw new IOException("Unknown job " + jobId + "!!");
+        }
+        jt = rjob.jobTokens;
+      }
+      // string to encrypt
+      String enc_str = SecureShuffleUtils.buildMsgFrom(request);
+      
+      // hash from the fetcher
+      String urlHashStr = request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+      if(urlHashStr == null) {
+        response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+        throw new IOException("fetcher cannot be authenticated");
+      }
+      int len = urlHashStr.length();
+      LOG.debug("verifying request. enc_str="+enc_str+"; hash=..."+
+          urlHashStr.substring(len-len/2, len-1)); // half of the hash for debug
+
+      SecureShuffleUtils ssutil = new SecureShuffleUtils(jt.getShuffleJobToken());
+      // verify - throws exception
+      try {
+        ssutil.verifyReply(urlHashStr, enc_str);
+      } catch (IOException ioe) {
+        response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+        throw ioe;
+      }
+      
+      // verification passed - encode the reply
+      String reply = ssutil.generateHash(urlHashStr.getBytes());
+      response.addHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+      
+      len = reply.length();
+      LOG.debug("Fetcher request verfied. enc_str="+enc_str+";reply="
+          +reply.substring(len-len/2, len-1));
+    }
   }
+  
 
   // get the full paths of the directory in all the local disks.
   private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
@@ -3444,4 +3514,37 @@
   TrackerDistributedCacheManager getTrackerDistributedCacheManager() {
     return distributedCacheManager;
   }
+  
+    /**
+     * Download the job-token file from the FS and save on local fs.
+     * @param user
+     * @param jobId
+     * @param jobConf
+     * @return the local file system path of the downloaded file.
+     * @throws IOException
+     */
+    private void localizeJobTokenFile(String user, JobID jobId, JobConf jobConf)
+        throws IOException {
+      // check if the tokenJob file is there..
+      Path skPath = new Path(systemDirectory, 
+          jobId.toString()+"/"+JobTokens.JOB_TOKEN_FILENAME);
+      
+      FileStatus status = null;
+      long jobTokenSize = -1;
+      status = systemFS.getFileStatus(skPath); //throws FileNotFoundException
+      jobTokenSize = status.getLen();
+      
+      Path localJobTokenFile =
+          lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user, 
+              jobId.toString()), jobTokenSize, fConf);
+    
+      LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() + 
+          " to " + localJobTokenFile.toUri().getPath());
+      
+      // Download job_token
+      systemFS.copyToLocalFile(skPath, localJobTokenFile);      
+      // set it into jobConf to transfer the name to TaskRunner
+      jobConf.set(JobContext.JOB_TOKEN_FILE,localJobTokenFile.toString());
+    }
+
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Fri Nov 20 23:43:45 2009
@@ -220,6 +220,7 @@
     "mapreduce.reduce.merge.memtomem.threshold";
   public static final String REDUCE_MEMTOMEM_ENABLED = 
     "mapreduce.reduce.merge.memtomem.enabled";
+  public static final String JOB_TOKEN_FILE = "mapreduce.job.jobTokenFile";
 
   /**
    * Return the configuration for the job.

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java?rev=882790&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/JobTokens.java Fri Nov 20 23:43:45 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * get/set, store/load security keys
+ * key's value - byte[]
+ * store/load from DataInput/DataOuptut
+ * List of currently store keys:
+ *  jobToken for secure shuffle HTTP Get
+ *
+ */
+@InterfaceAudience.Private
+public class JobTokens implements Writable {
+  /**
+   * file name used on HDFS for generated keys
+   */
+  public static final String JOB_TOKEN_FILENAME = "jobTokens";
+
+  private byte [] shuffleJobToken = null; // jobtoken for shuffle (map output)
+
+  
+  /**
+   * returns the key value for the alias
+   * @return key for this alias
+   */
+  public byte[] getShuffleJobToken() {
+    return shuffleJobToken;
+  }
+  
+  /**
+   * sets the jobToken
+   * @param key
+   */
+  public void setShuffleJobToken(byte[] key) {
+    shuffleJobToken = key;
+  }
+  
+  /**
+   * stores all the keys to DataOutput
+   * @param out
+   * @throws IOException
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeCompressedByteArray(out, shuffleJobToken);
+  }
+  
+  /**
+   * loads all the keys
+   * @param in
+   * @throws IOException
+   */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    shuffleJobToken = WritableUtils.readCompressedByteArray(in);
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java?rev=882790&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java Fri Nov 20 23:43:45 2009
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.security;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URL;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.record.Utils;
+
+/**
+ * 
+ * utilities for generating kyes, hashes and verifying them for shuffle
+ *
+ */
+@InterfaceAudience.Private
+public class SecureShuffleUtils {
+  public static final String HTTP_HEADER_URL_HASH = "UrlHash";
+  public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
+  public static KeyGenerator kg = null;
+  public static String DEFAULT_ALG="HmacSHA1";
+  
+  private SecretKeySpec secretKey;
+  private Mac mac;
+  
+  /**
+   * static generate keys
+   * @return new encoded key
+   * @throws NoSuchAlgorithmException
+   */
+  public static byte[] getNewEncodedKey() throws NoSuchAlgorithmException{
+    SecretKeySpec key = generateKey(DEFAULT_ALG);
+    return key.getEncoded();
+  }
+  
+  private static SecretKeySpec generateKey(String alg) throws NoSuchAlgorithmException {
+    if(kg==null) {
+      kg = KeyGenerator.getInstance(alg);
+    }
+    return (SecretKeySpec) kg.generateKey();
+  }
+
+  /**
+   * Create a util object with alg and key
+   * @param sKeyEncoded
+   * @throws NoSuchAlgorithmException
+   * @throws InvalidKeyException
+   */
+  public SecureShuffleUtils(byte [] sKeyEncoded) 
+  throws  IOException{
+    secretKey = new SecretKeySpec(sKeyEncoded, SecureShuffleUtils.DEFAULT_ALG);
+    try {
+      mac = Mac.getInstance(DEFAULT_ALG);
+      mac.init(secretKey);
+    } catch (NoSuchAlgorithmException nae) {
+      throw new IOException(nae);
+    } catch( InvalidKeyException ie) {
+      throw new IOException(ie);
+    }
+  }
+  
+  /** 
+   * get key as byte[]
+   * @return encoded key
+   */
+  public byte [] getEncodedKey() {
+    return secretKey.getEncoded();
+  }
+  
+  /**
+   * Base64 encoded hash of msg
+   * @param msg
+   */
+  public String generateHash(byte[] msg) {
+    return new String(Base64.encodeBase64(generateByteHash(msg)));
+  }
+  
+  /**
+   * calculate hash of msg
+   * @param msg
+   * @return
+   */
+  private byte[] generateByteHash(byte[] msg) {
+    return mac.doFinal(msg);
+  }
+  
+  /**
+   * verify that hash equals to HMacHash(msg)
+   * @param newHash
+   * @return true if is the same
+   */
+  private boolean verifyHash(byte[] hash, byte[] msg) {
+    byte[] msg_hash = generateByteHash(msg);
+    return Utils.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length) == 0;
+  }
+  
+  /**
+   * Aux util to calculate hash of a String
+   * @param enc_str
+   * @return Base64 encodedHash
+   * @throws IOException
+   */
+  public String hashFromString(String enc_str) 
+  throws IOException {
+    return generateHash(enc_str.getBytes()); 
+  }
+  
+  /**
+   * verify that base64Hash is same as HMacHash(msg)  
+   * @param base64Hash (Base64 encoded hash)
+   * @param msg
+   * @throws IOException if not the same
+   */
+  public void verifyReply(String base64Hash, String msg)
+  throws IOException {
+    byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
+    
+    boolean res = verifyHash(hash, msg.getBytes());
+    
+    if(res != true) {
+      throw new IOException("Verification of the hashReply failed");
+    }
+  }
+  
+  /**
+   * Shuffle specific utils - build string for encoding from URL
+   * @param url
+   * @return string for encoding
+   */
+  public static String buildMsgFrom(URL url) {
+    return buildMsgFrom(url.getPath(), url.getQuery(), url.getPort());
+  }
+  /**
+   * Shuffle specific utils - build string for encoding from URL
+   * @param request
+   * @return string for encoding
+   */
+  public static String buildMsgFrom(HttpServletRequest request ) {
+    return buildMsgFrom(request.getRequestURI(), request.getQueryString(),
+        request.getLocalPort());
+  }
+  /**
+   * Shuffle specific utils - build string for encoding from URL
+   * @param uri_path
+   * @param uri_query
+   * @return string for encoding
+   */
+  private static String buildMsgFrom(String uri_path, String uri_query, int port) {
+    return String.valueOf(port) + uri_path + "?" + uri_query;
+  }
+  
+  
+  /**
+   * byte array to Hex String
+   * @param ba
+   * @return string with HEX value of the key
+   */
+  public static String toHex(byte[] ba) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintStream ps = new PrintStream(baos);
+    for(byte b: ba) {
+      ps.printf("%x", b);
+    }
+    return baos.toString();
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Fri Nov 20 23:43:45 2009
@@ -41,10 +41,15 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import org.apache.commons.codec.binary.Base64;
+import java.security.GeneralSecurityException;
+
 class Fetcher<K,V> extends Thread {
   
   private static final Log LOG = LogFactory.getLog(Fetcher.class);
@@ -83,11 +88,12 @@
   // Decompression of map-outputs
   private final CompressionCodec codec;
   private final Decompressor decompressor;
+  private final byte[] shuffleJobToken;
 
   public Fetcher(JobConf job, TaskAttemptID reduceId, 
                  ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
                  Reporter reporter, ShuffleClientMetrics metrics,
-                 ExceptionReporter exceptionReporter) {
+                 ExceptionReporter exceptionReporter, byte [] shuffleJobToken) {
     this.reporter = reporter;
     this.scheduler = scheduler;
     this.merger = merger;
@@ -95,6 +101,7 @@
     this.exceptionReporter = exceptionReporter;
     this.id = ++nextId;
     this.reduce = reduceId.getTaskID().getId();
+    this.shuffleJobToken = shuffleJobToken;
     ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.IO_ERROR.toString());
     wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
@@ -185,12 +192,31 @@
     boolean connectSucceeded = false;
     
     try {
-      URLConnection connection = getMapOutputURL(host, maps).openConnection();
+      URL url = getMapOutputURL(host, maps);
+      URLConnection connection = url.openConnection();
+      
+      // generate hash of the url
+      SecureShuffleUtils ssutil = new SecureShuffleUtils(shuffleJobToken);
+      String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
+      String encHash = ssutil.hashFromString(msgToEncode);
+      
+      // put url hash into http header
+      connection.addRequestProperty(
+          SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
       connectSucceeded = true;
       input = 
         new DataInputStream(getInputStream(connection, connectionTimeout,
                                            readTimeout));
-
+      
+      // get the replyHash which is HMac of the encHash we sent to the server
+      String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
+      if(replyHash==null) {
+        throw new IOException("security validation of TT Map output failed");
+      }
+      LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
+      // verify that replyHash is HMac of encHash
+      ssutil.verifyReply(replyHash, encHash);
+      LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
     } catch (IOException ie) {
       ioErrs.increment(1);
       LOG.warn("Failed to connect to " + host + " with " + remaining.size() + 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Fri Nov 20 23:43:45 2009
@@ -93,7 +93,7 @@
                                     mergedMapOutputsCounter, 
                                     this, mergePhase);
   }
-  
+
   @SuppressWarnings("unchecked")
   public RawKeyValueIterator run() throws IOException, InterruptedException {
     // Start the map-completion events fetcher thread
@@ -106,7 +106,8 @@
     Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
     for (int i=0; i < numFetchers; ++i) {
       fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
-                                     reporter, metrics, this);
+                                     reporter, metrics, this, 
+                                     reduceTask.getJobTokens().getShuffleJobToken());
       fetchers[i].start();
     }
     

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java?rev=882790&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java Fri Nov 20 23:43:45 2009
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+
+import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
+public class TestShuffleJobToken {
+  private static HttpServer server;
+  private static URL baseUrl;
+  private static File dir;
+  private static final String JOB_ID = "job_20091117075357176_0001";
+  
+  // create fake url
+  private URL getMapOutputURL(String host)  throws MalformedURLException {
+    // Get the base url
+    StringBuffer url = new StringBuffer(host);
+    url.append("mapOutput?");
+    url.append("job=" + JOB_ID + "&");
+    url.append("reduce=0&");
+    url.append("map=attempt");
+
+    return new URL(url.toString());
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    dir = new File(System.getProperty("build.webapps", "build/webapps") + "/test");
+    System.out.println("dir="+dir.getAbsolutePath());
+    if(!dir.exists()) {
+      assertTrue(dir.mkdirs());
+    }
+    server = new HttpServer("test", "0.0.0.0", 0, true);
+    server.addServlet("shuffle", "/mapOutput", TaskTracker.MapOutputServlet.class);
+    server.start();
+    int port = server.getPort();
+    baseUrl = new URL("http://localhost:" + port + "/");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if(dir.exists())
+      dir.delete();
+    if(server!=null)
+      server.stop();
+  }
+
+  
+  /**
+   * try positive and negative case with invalid urlHash
+   */
+  @Test
+  public void testInvalidJobToken()
+  throws IOException, GeneralSecurityException {
+    
+    URL url = getMapOutputURL(baseUrl.toString());
+    String enc_str = SecureShuffleUtils.buildMsgFrom(url);
+    URLConnection connectionGood = url.openConnection();
+
+    // create key 
+    byte [] key= SecureShuffleUtils.getNewEncodedKey();
+    
+    // create fake TaskTracker - needed for keys storage
+    JobTokens jt = new JobTokens();
+    jt.setShuffleJobToken(key);
+    TaskTracker tt  = new TaskTracker();
+    addJobToken(tt, JOB_ID, jt); // fake id
+    server.setAttribute("task.tracker", tt);
+
+    // encode the url
+    SecureShuffleUtils mac = new SecureShuffleUtils(key);
+    String urlHashGood = mac.generateHash(enc_str.getBytes()); // valid hash
+    
+    // another the key
+    byte [] badKey= SecureShuffleUtils.getNewEncodedKey();
+    mac = new SecureShuffleUtils(badKey);
+    String urlHashBad = mac.generateHash(enc_str.getBytes()); // invalid hash 
+    
+    // put url hash into http header
+    connectionGood.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, urlHashGood);
+    
+    // valid url hash should not fail with security error
+    try {
+      connectionGood.getInputStream();
+    } catch (IOException ie) {
+      String msg = ie.getLocalizedMessage();
+      if(msg.contains("Server returned HTTP response code: 401 for URL:")) {
+        fail("securtity failure with valid urlHash:"+ie);
+      }
+      System.out.println("valid urlhash passed validation");
+    } 
+    // invalid url hash
+    URLConnection connectionBad = url.openConnection();
+    connectionBad.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, urlHashBad);
+    
+    try {
+      connectionBad.getInputStream();
+      fail("Connection should've failed because of invalid urlHash");
+    } catch (IOException ie) {
+      String msg = ie.getLocalizedMessage();
+      if(!msg.contains("Server returned HTTP response code: 401 for URL:")) {
+        fail("connection failed with other then validation error:"+ie);
+      }
+      System.out.println("validation worked, failed with:"+ie);
+    } 
+  }
+  /*Note that this method is there for a unit testcase (TestShuffleJobToken)*/
+  void addJobToken(TaskTracker tt, String jobIdStr, JobTokens jt) {
+    JobID jobId = JobID.forName(jobIdStr);
+    TaskTracker.RunningJob rJob = new TaskTracker.RunningJob(jobId);
+    rJob.jobTokens = jt;
+    synchronized (tt.runningJobs) {
+      tt.runningJobs.put(jobId, rJob);
+    }
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=882790&r1=882789&r2=882790&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Nov 20 23:43:45 2009
@@ -36,6 +36,7 @@
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.JobTokens;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Shell;
@@ -140,14 +141,15 @@
 
     // JobClient uploads the jobConf to the file system.
     File jobConfFile = uploadJobConf(job.getConfiguration());
-
-    // Set up the TaskTracker
+    
+        // Set up the TaskTracker
     tracker = new TaskTracker();
     tracker.setConf(trackerFConf);
 
     // for test case system FS is the local FS
     tracker.localFs = tracker.systemFS = FileSystem.getLocal(trackerFConf);
-
+    tracker.systemDirectory = new Path(TEST_ROOT_DIR.getAbsolutePath());
+    
     taskTrackerUGI = UserGroupInformation.login(trackerFConf);
 
     // Set up the task to be localized
@@ -159,6 +161,10 @@
         new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, null, 1);
     task.setConf(job.getConfiguration()); // Set conf. Set user name in particular.
 
+    // create jobTokens file
+    uploadJobTokensFile(); 
+    
+    
     taskController = new DefaultTaskController();
     taskController.setConf(trackerFConf);
     taskController.setup();
@@ -204,6 +210,25 @@
     out.close();
     return jobConfFile;
   }
+  
+  /**
+   * create fake JobTokens file
+   * @return
+   * @throws IOException
+   */
+  protected void uploadJobTokensFile() throws IOException {
+    
+    File dir = new File(TEST_ROOT_DIR, jobId.toString());
+    if(!dir.exists())
+      assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs());
+    
+    File jobTokenFile = new File(dir, JobTokens.JOB_TOKEN_FILENAME);
+    FileOutputStream fos = new FileOutputStream(jobTokenFile);
+    java.io.DataOutputStream out = new java.io.DataOutputStream(fos);
+    JobTokens jt = new JobTokens();
+    jt.write(out); // writing empty file, we don't the keys for this test 
+    out.close();
+  }
 
   @Override
   protected void tearDown()