You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2011/04/12 03:14:45 UTC

svn commit: r1091273 - in /hadoop/common/branches/branch-0.20-security: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapreduce/security/

Author: acmurthy
Date: Tue Apr 12 01:14:44 2011
New Revision: 1091273

URL: http://svn.apache.org/viewvc?rev=1091273&view=rev
Log:
MAPREDUCE-2429. Validate JVM in TaskUmbilicalProtocol. Contributed by Siddharth Seth.

Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskCommit.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Tue Apr 12 01:14:44 2011
@@ -4,6 +4,9 @@ Release 0.20.204.0 - unreleased
 
   BUG FIXES
 
+    MAPREDUCE-2429. Validate JVM in TaskUmbilicalProtocol. (Siddharth Seth via
+    acmurthy) 
+
     MAPREDUCE-2418. Show job errors in JobHistory page. (Siddharth Seth via
     acmurthy) 
 

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Child.java Tue Apr 12 01:14:44 2011
@@ -169,6 +169,7 @@ class Child {
     
     UserGroupInformation childUGI = null;
 
+    final JvmContext jvmContext = context;
     try {
       while (true) {
         taskid = null;
@@ -250,13 +251,14 @@ class Child {
         
         // Create a final reference to the task for the doAs block
         final Task taskFinal = task;
+        taskFinal.setJvmContext(jvmContext);
         childUGI.doAs(new PrivilegedExceptionAction<Object>() {
           @Override
           public Object run() throws Exception {
             try {
               // use job-specified working directory
               FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
-              taskFinal.run(job, umbilical);             // run the task
+              taskFinal.run(job, umbilical);        // run the task
             } finally {
               TaskLog.syncLogs
                 (logLocation, taskid, isCleanup, logIsSegmented(job));
@@ -275,7 +277,7 @@ class Child {
       }
     } catch (FSError e) {
       LOG.fatal("FSError from child", e);
-      umbilical.fsError(taskid, e.getMessage());
+      umbilical.fsError(taskid, e.getMessage(), jvmContext);
     } catch (Exception exception) {
       LOG.warn("Error running child", exception);
       try {
@@ -301,7 +303,7 @@ class Child {
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       exception.printStackTrace(new PrintStream(baos));
       if (taskid != null) {
-        umbilical.reportDiagnosticInfo(taskid, baos.toString());
+        umbilical.reportDiagnosticInfo(taskid, baos.toString(), jvmContext);
       }
     } catch (Throwable throwable) {
       LOG.fatal("Error running child : "
@@ -311,7 +313,7 @@ class Child {
         String cause = tCause == null 
                        ? throwable.getMessage() 
                        : StringUtils.stringifyException(tCause);
-        umbilical.fatalError(taskid, cause);
+        umbilical.fatalError(taskid, cause, jvmContext);
       }
     } finally {
       RPC.stopProxy(umbilical);

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Tue Apr 12 01:14:44 2011
@@ -53,19 +53,23 @@ public class IsolationRunner {
       return TaskUmbilicalProtocol.versionID;
     }
     
-    public void done(TaskAttemptID taskid) throws IOException {
+    public void done(TaskAttemptID taskid, JvmContext jvmContext)
+        throws IOException {
       LOG.info("Task " + taskid + " reporting done.");
     }
 
-    public void fsError(TaskAttemptID taskId, String message) throws IOException {
+    public void fsError(TaskAttemptID taskId, String message,
+        JvmContext jvmContext) throws IOException {
       LOG.info("Task " + taskId + " reporting file system error: " + message);
     }
 
-    public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
+    public void shuffleError(TaskAttemptID taskId, String message,
+        JvmContext jvmContext) throws IOException {
       LOG.info("Task " + taskId + " reporting shuffle error: " + message);
     }
 
-    public void fatalError(TaskAttemptID taskId, String msg) throws IOException{
+    public void fatalError(TaskAttemptID taskId, String msg,
+        JvmContext jvmContext) throws IOException {
       LOG.info("Task " + taskId + " reporting fatal error: " + msg);
     }
 
@@ -73,20 +77,21 @@ public class IsolationRunner {
       return null;
     }
 
-    public boolean ping(TaskAttemptID taskid) throws IOException {
+    public boolean ping(TaskAttemptID taskid, JvmContext jvmContext) throws IOException {
       return true;
     }
 
-    public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus) 
-    throws IOException, InterruptedException {
-      statusUpdate(taskId, taskStatus);
+    public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus,
+        JvmContext jvmContext) throws IOException, InterruptedException {
+      statusUpdate(taskId, taskStatus, jvmContext);
     }
     
-    public boolean canCommit(TaskAttemptID taskid) throws IOException {
+    public boolean canCommit(TaskAttemptID taskid, JvmContext jvmContext)
+        throws IOException {
       return true;
     }
     
-    public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
+    public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus, JvmContext context) 
     throws IOException, InterruptedException {
       StringBuffer buf = new StringBuffer("Task ");
       buf.append(taskId);
@@ -103,18 +108,20 @@ public class IsolationRunner {
       return true;
     }
 
-    public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException {
+    public void reportDiagnosticInfo(TaskAttemptID taskid, String trace,
+        JvmContext jvmContext) throws IOException {
       LOG.info("Task " + taskid + " has problem " + trace);
     }
     
-    public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, 
-        int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
-      return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY, 
-                                               false);
+    public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
+        int fromEventId, int maxLocs, TaskAttemptID id, JvmContext jvmContext)
+        throws IOException {
+      return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
+          false);
     }
 
     public void reportNextRecordRange(TaskAttemptID taskid, 
-        SortedRanges.Range range) throws IOException {
+        SortedRanges.Range range, JvmContext jvmContext) throws IOException {
       LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
     }
 

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JvmManager.java Tue Apr 12 01:14:44 2011
@@ -128,6 +128,14 @@ class JvmManager {
     }
   }
 
+  public boolean validateTipToJvm(TaskInProgress tip, JVMId jvmId) {
+    if (jvmId.isMapJVM()) {
+      return mapJvmManager.validateTipToJvm(tip, jvmId);
+    } else {
+      return reduceJvmManager.validateTipToJvm(tip, jvmId);
+    }
+  }
+
   public TaskInProgress getTaskForJvm(JVMId jvmId)
       throws IOException {
     if (jvmId.isMapJVM()) {
@@ -223,6 +231,23 @@ class JvmManager {
       jvmIdToRunner.get(jvmId).setBusy(true);
     }
     
+    synchronized public boolean validateTipToJvm(TaskInProgress tip, JVMId jvmId) {
+      if (jvmId == null) {
+        LOG.warn("Null jvmId. Cannot verify Jvm. validateTipToJvm returning false");
+        return false;
+      }
+      TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
+      if (taskRunner == null) {
+        return false; //JvmId not known.
+      }
+      TaskInProgress knownTip = taskRunner.getTaskInProgress();
+      if (knownTip == tip) { // Valid to compare the addresses ? (or equals)
+        return true;
+      } else {
+        return false;
+      }
+    }
+
     synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
         throws IOException {
       if (jvmToRunningTask.containsKey(jvmId)) {

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Tue Apr 12 01:14:44 2011
@@ -311,7 +311,7 @@ class LocalJobRunner implements JobSubmi
 
     public JvmTask getTask(JvmContext context) { return null; }
 
-    public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
+    public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus, JvmContext context) 
     throws IOException, InterruptedException {
       LOG.info(taskStatus.getStateString());
       float taskIndex = mapIds.indexOf(taskId);
@@ -333,9 +333,10 @@ class LocalJobRunner implements JobSubmi
      * and it is waiting for the commit Response
      */
     public void commitPending(TaskAttemptID taskid,
-                              TaskStatus taskStatus) 
+                              TaskStatus taskStatus,
+                              JvmContext jvmContext) 
     throws IOException, InterruptedException {
-      statusUpdate(taskid, taskStatus);
+      statusUpdate(taskid, taskStatus, jvmContext);
     }
 
     /**
@@ -347,51 +348,55 @@ class LocalJobRunner implements JobSubmi
       completedTaskCounters.incrAllCounters(task.getCounters());
     }
 
-    public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
+    public void reportDiagnosticInfo(TaskAttemptID taskid, String trace,
+        JvmContext jvmContext) {
       // Ignore for now
     }
     
     public void reportNextRecordRange(TaskAttemptID taskid, 
-        SortedRanges.Range range) throws IOException {
+        SortedRanges.Range range, JvmContext jvmContext) throws IOException {
       LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
     }
 
-    public boolean ping(TaskAttemptID taskid) throws IOException {
+    public boolean ping(TaskAttemptID taskid, JvmContext jvmContext) throws IOException {
       return true;
     }
     
-    public boolean canCommit(TaskAttemptID taskid) 
+    public boolean canCommit(TaskAttemptID taskid, JvmContext jvmContext) 
     throws IOException {
       return true;
     }
     
-    public void done(TaskAttemptID taskId) throws IOException {
+    public void done(TaskAttemptID taskId, JvmContext jvmContext)
+        throws IOException {
       int taskIndex = mapIds.indexOf(taskId);
-      if (taskIndex >= 0) {                       // mapping
+      if (taskIndex >= 0) { // mapping
         status.setMapProgress(1.0f);
       } else {
         status.setReduceProgress(1.0f);
       }
     }
 
-    public synchronized void fsError(TaskAttemptID taskId, String message) 
-    throws IOException {
-      LOG.fatal("FSError: "+ message + "from task: " + taskId);
+    public synchronized void fsError(TaskAttemptID taskId, String message,
+        JvmContext jvmContext) throws IOException {
+      LOG.fatal("FSError: " + message + "from task: " + taskId);
     }
 
-    public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
-      LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
+    public void shuffleError(TaskAttemptID taskId, String message,
+        JvmContext jvmContext) throws IOException {
+      LOG.fatal("shuffleError: " + message + "from task: " + taskId);
     }
     
-    public synchronized void fatalError(TaskAttemptID taskId, String msg) 
-    throws IOException {
-      LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
+    public synchronized void fatalError(TaskAttemptID taskId, String msg,
+        JvmContext jvmContext) throws IOException {
+      LOG.fatal("Fatal: " + msg + "from task: " + taskId);
     }
     
-    public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, 
-        int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
+    public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
+        int fromEventId, int maxLocs, TaskAttemptID id, JvmContext jvmContext)
+        throws IOException {
       return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
-                                               false);
+          false);
     }
 
     @Override

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Apr 12 01:14:44 2011
@@ -341,12 +341,13 @@ class MapTask extends Task {
   }
 
   @Override
-  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
+  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) 
     throws IOException, ClassNotFoundException, InterruptedException {
     this.umbilical = umbilical;
 
     // start thread that will handle communication with parent
-    TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
+    TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
+        jvmContext);
     reporter.startCommunicationThread();
     boolean useNewApi = job.getUseNewMapper();
     initialize(job, getJobID(), reporter, useNewApi);

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Apr 12 01:14:44 2011
@@ -356,7 +356,8 @@ class ReduceTask extends Task {
       reducePhase = getProgress().addPhase("reduce");
     }
     // start thread that will handle communication with parent
-    TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
+    TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
+        jvmContext);
     reporter.startCommunicationThread();
     boolean useNewApi = job.getUseNewReducer();
     initialize(job, getJobID(), reporter, useNewApi);
@@ -1330,7 +1331,7 @@ class ReduceTask extends Task {
             LOG.error("Task: " + reduceTask.getTaskID() + " - FSError: " + 
                       StringUtils.stringifyException(e));
             try {
-              umbilical.fsError(reduceTask.getTaskID(), e.getMessage());
+              umbilical.fsError(reduceTask.getTaskID(), e.getMessage(), jvmContext);
             } catch (IOException io) {
               LOG.error("Could not notify TT of FSError: " + 
                       StringUtils.stringifyException(io));
@@ -2299,7 +2300,7 @@ class ReduceTask extends Task {
                             "Killing task " + getTaskID() + ".");
                   umbilical.shuffleError(getTaskID(), 
                                          "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
-                                         + " bailing-out.");
+                                         + " bailing-out.", jvmContext);
                 }
               }
                 
@@ -2857,7 +2858,7 @@ class ReduceTask extends Task {
           umbilical.getMapCompletionEvents(reduceTask.getJobID(), 
                                            fromEventId.get(), 
                                            MAX_EVENTS_TO_FETCH,
-                                           reduceTask.getTaskID());
+                                           reduceTask.getTaskID(), jvmContext);
         TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
           
         // Check if the reset is required.

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java Tue Apr 12 01:14:44 2011
@@ -154,6 +154,7 @@ abstract public class Task implements Wr
   private String pidFile = "";
   protected TaskUmbilicalProtocol umbilical;
   protected SecretKey tokenSecret;
+  protected JvmContext jvmContext;
 
   ////////////////////////////////////////////
   // Constructors
@@ -220,6 +221,21 @@ abstract public class Task implements Wr
     return this.tokenSecret;
   }
 
+  /**
+   * Set the task JvmContext
+   * @param jvmContext
+   */
+  public void setJvmContext(JvmContext jvmContext) {
+    this.jvmContext = jvmContext;
+  }
+  
+  /**
+   * Gets the task JvmContext
+   * @return the jvm context
+   */
+  public JvmContext getJvmContext() {
+    return this.jvmContext;
+  }
   
   /**
    * Get the index of this task within the job.
@@ -269,7 +285,7 @@ abstract public class Task implements Wr
                    ? StringUtils.stringifyException(throwable)
                    : StringUtils.stringifyException(tCause);
     try {
-      umbilical.fatalError(id, cause);
+      umbilical.fatalError(id, cause, jvmContext);
     } catch (IOException ioe) {
       LOG.fatal("Failed to contact the tasktracker", ioe);
       System.exit(-1);
@@ -446,8 +462,7 @@ abstract public class Task implements Wr
    * @param umbilical for progress reports
    */
   public abstract void run(JobConf job, TaskUmbilicalProtocol umbilical)
-    throws IOException, ClassNotFoundException, InterruptedException;
-
+      throws IOException, ClassNotFoundException, InterruptedException;
 
   /** Return an approprate thread runner for this task. 
    * @param tip TODO*/
@@ -509,6 +524,7 @@ abstract public class Task implements Wr
     private TaskUmbilicalProtocol umbilical;
     private InputSplit split = null;
     private Progress taskProgress;
+    private JvmContext jvmContext;
     private Thread pingThread = null;
     private static final int PROGRESS_STATUS_LEN_LIMIT = 512;
     private boolean done = true;
@@ -522,9 +538,10 @@ abstract public class Task implements Wr
     private AtomicBoolean progressFlag = new AtomicBoolean(false);
     
     TaskReporter(Progress taskProgress,
-                 TaskUmbilicalProtocol umbilical) {
+                 TaskUmbilicalProtocol umbilical, JvmContext jvmContext) {
       this.umbilical = umbilical;
       this.taskProgress = taskProgress;
+      this.jvmContext = jvmContext;
     }
     // getters and setters for flag
     void setProgressFlag() {
@@ -630,12 +647,12 @@ abstract public class Task implements Wr
             taskStatus.statusUpdate(taskProgress.get(),
                                     taskProgress.toString(), 
                                     counters);
-            taskFound = umbilical.statusUpdate(taskId, taskStatus);
+            taskFound = umbilical.statusUpdate(taskId, taskStatus, jvmContext);
             taskStatus.clearStatus();
           }
           else {
             // send ping 
-            taskFound = umbilical.ping(taskId);
+            taskFound = umbilical.ping(taskId, jvmContext);
           }
 
           // if Task Tracker is not aware of our task ID (probably because it died and 
@@ -709,7 +726,7 @@ abstract public class Task implements Wr
     if (LOG.isDebugEnabled()) {
       LOG.debug("sending reportNextRecordRange " + range);
     }
-    umbilical.reportNextRecordRange(taskId, range);
+    umbilical.reportNextRecordRange(taskId, range, jvmContext);
   }
 
   /**
@@ -783,7 +800,7 @@ abstract public class Task implements Wr
       // say the task tracker that task is commit pending
       while (true) {
         try {
-          umbilical.commitPending(taskId, taskStatus);
+          umbilical.commitPending(taskId, taskStatus, jvmContext);
           break;
         } catch (InterruptedException ie) {
           // ignore
@@ -826,7 +843,7 @@ abstract public class Task implements Wr
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
+        if (!umbilical.statusUpdate(getTaskID(), taskStatus, jvmContext)) {
           LOG.warn("Parent died.  Exiting "+taskId);
           System.exit(66);
         }
@@ -883,7 +900,7 @@ abstract public class Task implements Wr
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        umbilical.done(getTaskID());
+        umbilical.done(getTaskID(), jvmContext);
         LOG.info("Task '" + taskId + "' done.");
         return;
       } catch (IOException ie) {
@@ -903,7 +920,7 @@ abstract public class Task implements Wr
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        while (!umbilical.canCommit(taskId)) {
+        while (!umbilical.canCommit(taskId, jvmContext)) {
           try {
             Thread.sleep(1000);
           } catch(InterruptedException ie) {

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Apr 12 01:14:44 2011
@@ -2980,6 +2980,12 @@ public class TaskTracker implements MRCo
     }
   }
   
+  private void validateJVM(TaskInProgress tip, JvmContext jvmContext, TaskAttemptID taskid) throws IOException {
+    if (!jvmManager.validateTipToJvm(tip, jvmContext.jvmId)) {
+      throw new IOException("JvmValidate Failed. Ignoring request from task: " + taskid + ", with JvmId: " + jvmContext.jvmId);
+    }
+  }
+  
   private void authorizeJVM(org.apache.hadoop.mapreduce.JobID jobId) 
   throws IOException {
     String currentJobId = 
@@ -3039,11 +3045,13 @@ public class TaskTracker implements MRCo
    * Called periodically to report Task progress, from 0.0 to 1.0.
    */
   public synchronized boolean statusUpdate(TaskAttemptID taskid, 
-                                              TaskStatus taskStatus) 
+                                              TaskStatus taskStatus, 
+                                              JvmContext jvmContext) 
   throws IOException {
     authorizeJVM(taskid.getJobID());
     TaskInProgress tip = tasks.get(taskid);
     if (tip != null) {
+      validateJVM(tip, jvmContext, taskid);
       tip.reportProgress(taskStatus);
       return true;
     } else {
@@ -3056,9 +3064,16 @@ public class TaskTracker implements MRCo
    * Called when the task dies before completion, and we want to report back
    * diagnostic info
    */
-  public synchronized void reportDiagnosticInfo(TaskAttemptID taskid, String info) throws IOException {
+  public synchronized void reportDiagnosticInfo(TaskAttemptID taskid,
+      String info, JvmContext jvmContext) throws IOException {
     authorizeJVM(taskid.getJobID());
-    reportDiagnosticInfoInternal(taskid, info);
+    TaskInProgress tip = tasks.get(taskid);
+    if (tip != null) {
+      validateJVM(tip, jvmContext, taskid);
+      tip.reportDiagnosticInfo(info);
+    } else {
+      LOG.warn("Error from unknown child task: "+taskid+". Ignored.");
+    }
   }
   /**
    * Meant to be used internally
@@ -3077,10 +3092,11 @@ public class TaskTracker implements MRCo
   }
   
   public synchronized void reportNextRecordRange(TaskAttemptID taskid, 
-      SortedRanges.Range range) throws IOException {
+      SortedRanges.Range range, JvmContext jvmContext) throws IOException {
     authorizeJVM(taskid.getJobID());
     TaskInProgress tip = tasks.get(taskid);
     if (tip != null) {
+      validateJVM(tip, jvmContext, taskid);
       tip.reportNextRecordRange(range);
     } else {
       LOG.warn("reportNextRecordRange from unknown child task: "+taskid+". " +
@@ -3088,10 +3104,17 @@ public class TaskTracker implements MRCo
     }
   }
 
-  /** Child checking to see if we're alive.  Normally does nothing.*/
-  public synchronized boolean ping(TaskAttemptID taskid) throws IOException {
+  /** Child checking to see if we're alive. Normally does nothing. */
+  public synchronized boolean ping(TaskAttemptID taskid, JvmContext jvmContext)
+      throws IOException {
     authorizeJVM(taskid.getJobID());
-    return tasks.get(taskid) != null;
+    TaskInProgress tip = tasks.get(taskid);
+    if (tip != null) {
+      validateJVM(tip, jvmContext, taskid);
+      return true;
+    } else {
+      return false;
+    }
   }
 
   /**
@@ -3099,33 +3122,38 @@ public class TaskTracker implements MRCo
    * and it is waiting for the commit Response
    */
   public synchronized void commitPending(TaskAttemptID taskid,
-                                         TaskStatus taskStatus) 
+                                         TaskStatus taskStatus,
+                                         JvmContext jvmContext) 
   throws IOException {
     authorizeJVM(taskid.getJobID());
     LOG.info("Task " + taskid + " is in commit-pending," +"" +
              " task state:" +taskStatus.getRunState());
-    statusUpdate(taskid, taskStatus);
+    // validateJVM is done in statusUpdate
+    statusUpdate(taskid, taskStatus, jvmContext);
     reportTaskFinished(taskid, true);
   }
   
   /**
    * Child checking whether it can commit 
    */
-  public synchronized boolean canCommit(TaskAttemptID taskid) 
-  throws IOException {
+  public synchronized boolean canCommit(TaskAttemptID taskid,
+      JvmContext jvmContext) throws IOException {
     authorizeJVM(taskid.getJobID());
-    return commitResponses.contains(taskid); //don't remove it now
+    TaskInProgress tip = tasks.get(taskid);
+    validateJVM(tip, jvmContext, taskid);
+    return commitResponses.contains(taskid); // don't remove it now
   }
   
   /**
    * The task is done.
    */
-  public synchronized void done(TaskAttemptID taskid) 
+  public synchronized void done(TaskAttemptID taskid, JvmContext jvmContext) 
   throws IOException {
     authorizeJVM(taskid.getJobID());
     TaskInProgress tip = tasks.get(taskid);
-    commitResponses.remove(taskid);
     if (tip != null) {
+      validateJVM(tip, jvmContext, taskid);
+      commitResponses.remove(taskid);
       tip.reportDone();
     } else {
       LOG.warn("Unknown child task done: "+taskid+". Ignored.");
@@ -3136,22 +3164,36 @@ public class TaskTracker implements MRCo
   /** 
    * A reduce-task failed to shuffle the map-outputs. Kill the task.
    */  
-  public synchronized void shuffleError(TaskAttemptID taskId, String message) 
+  public synchronized void shuffleError(TaskAttemptID taskId, String message, JvmContext jvmContext) 
   throws IOException { 
     authorizeJVM(taskId.getJobID());
-    LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: " + message);
     TaskInProgress tip = runningTasks.get(taskId);
-    tip.reportDiagnosticInfo("Shuffle Error: " + message);
-    purgeTask(tip, true);
+    if (tip != null) {
+      validateJVM(tip, jvmContext, taskId);
+      LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: "
+          + message);
+      tip.reportDiagnosticInfo("Shuffle Error: " + message);
+      purgeTask(tip, true);
+    } else {
+      LOG.warn("Unknown child task shuffleError: " + taskId + ". Ignored.");
+    }
   }
 
   /** 
    * A child task had a local filesystem error. Kill the task.
    */  
-  public synchronized void fsError(TaskAttemptID taskId, String message) 
-  throws IOException {
+  public synchronized void fsError(TaskAttemptID taskId, String message,
+      JvmContext jvmContext) throws IOException {
     authorizeJVM(taskId.getJobID());
-    fsErrorInternal(taskId, message);  
+    TaskInProgress tip = runningTasks.get(taskId);
+    if (tip != null) {
+      validateJVM(tip, jvmContext, taskId);
+      LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
+      tip.reportDiagnosticInfo("FSError: " + message);
+      purgeTask(tip, true);
+    } else {
+      LOG.warn("Unknown child task fsError: "+taskId+". Ignored.");
+    }
   }
   /**
    * Meant to be used internally
@@ -3170,18 +3212,29 @@ public class TaskTracker implements MRCo
   /** 
    * A child task had a fatal error. Kill the task.
    */  
-  public synchronized void fatalError(TaskAttemptID taskId, String msg) 
-  throws IOException {
+  public synchronized void fatalError(TaskAttemptID taskId, String msg,
+      JvmContext jvmContext) throws IOException {
     authorizeJVM(taskId.getJobID());
-    LOG.fatal("Task: " + taskId + " - Killed : " + msg);
     TaskInProgress tip = runningTasks.get(taskId);
-    tip.reportDiagnosticInfo("Error: " + msg);
-    purgeTask(tip, true);
+    if (tip != null) {
+      validateJVM(tip, jvmContext, taskId);
+      LOG.fatal("Task: " + taskId + " - Killed : " + msg);
+      tip.reportDiagnosticInfo("Error: " + msg);
+      purgeTask(tip, true);
+    } else {
+      LOG.warn("Unknown child task fatalError: "+taskId+". Ignored.");
+    }
   }
 
   public synchronized MapTaskCompletionEventsUpdate getMapCompletionEvents(
-      JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id) 
-  throws IOException {
+      JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id,
+      JvmContext jvmContext) throws IOException {
+    TaskInProgress tip = runningTasks.get(id);
+    if (tip == null) {
+      throw new IOException("Unknown task; " + id
+          + ". Ignoring getMapCompletionEvents Request");
+    }
+    validateJVM(tip, jvmContext, id);
     authorizeJVM(jobId);
     TaskCompletionEvent[]mapEvents = TaskCompletionEvent.EMPTY_ARRAY;
     synchronized (shouldReset) {

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Tue Apr 12 01:14:44 2011
@@ -61,7 +61,7 @@ public interface TaskUmbilicalProtocol e
    * Version 18 Added fatalError for child to communicate fatal errors to TT
    * */
 
-  public static final long versionID = 18L;
+  public static final long versionID = 19L;
   
   /**
    * Called when a child task process starts, to get its task.
@@ -77,66 +77,78 @@ public interface TaskUmbilicalProtocol e
    * 
    * @param taskId task-id of the child
    * @param taskStatus status of the child
+   * @param jvmContext context the jvmContext running the task.
    * @throws IOException
    * @throws InterruptedException
    * @return True if the task is known
    */
-  boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
-  throws IOException, InterruptedException;
+  boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus,
+      JvmContext context) throws IOException, InterruptedException;
   
   /** Report error messages back to parent.  Calls should be sparing, since all
    *  such messages are held in the job tracker.
    *  @param taskid the id of the task involved
    *  @param trace the text to report
+   *  @param jvmContext context the jvmContext running the task.
    */
-  void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException;
+  void reportDiagnosticInfo(TaskAttemptID taskid, String trace,
+      JvmContext jvmContext) throws IOException;
   
   /**
    * Report the record range which is going to process next by the Task.
    * @param taskid the id of the task involved
    * @param range the range of record sequence nos
+   * @param jvmContext context the jvmContext running the task.
    * @throws IOException
    */
-  void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range) 
-    throws IOException;
+  void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range,
+      JvmContext jvmContext) throws IOException;
 
-  /** Periodically called by child to check if parent is still alive. 
+  /** Periodically called by child to check if parent is still alive.
+   * @param taskid the id of the task involved
+   * @param jvmContext context the jvmContext running the task.
    * @return True if the task is known
    */
-  boolean ping(TaskAttemptID taskid) throws IOException;
+  boolean ping(TaskAttemptID taskid, JvmContext jvmContext) throws IOException;
 
   /** Report that the task is successfully completed.  Failure is assumed if
    * the task process exits without calling this.
    * @param taskid task's id
+   * @param jvmContext context the jvmContext running the task.
    */
-  void done(TaskAttemptID taskid) throws IOException;
+  void done(TaskAttemptID taskid, JvmContext jvmContext) throws IOException;
   
   /** 
    * Report that the task is complete, but its commit is pending.
    * 
    * @param taskId task's id
    * @param taskStatus status of the child
+   * @param jvmContext context the jvmContext running the task.
    * @throws IOException
    */
-  void commitPending(TaskAttemptID taskId, TaskStatus taskStatus) 
-  throws IOException, InterruptedException;  
+  void commitPending(TaskAttemptID taskId, TaskStatus taskStatus,
+      JvmContext jvmContext) throws IOException, InterruptedException;  
 
   /**
    * Polling to know whether the task can go-ahead with commit 
    * @param taskid
+   * @param jvmContext context the jvmContext running the task.
    * @return true/false 
    * @throws IOException
    */
-  boolean canCommit(TaskAttemptID taskid) throws IOException;
+  boolean canCommit(TaskAttemptID taskid, JvmContext jvmContext) throws IOException;
 
-  /** Report that a reduce-task couldn't shuffle map-outputs.*/
-  void shuffleError(TaskAttemptID taskId, String message) throws IOException;
+  /** Report that a reduce-task couldn't shuffle map-outputs. */
+  void shuffleError(TaskAttemptID taskId, String message, JvmContext jvmContext)
+      throws IOException;
   
   /** Report that the task encounted a local filesystem error.*/
-  void fsError(TaskAttemptID taskId, String message) throws IOException;
+  void fsError(TaskAttemptID taskId, String message, JvmContext jvmContext)
+      throws IOException;
 
   /** Report that the task encounted a fatal error.*/
-  void fatalError(TaskAttemptID taskId, String message) throws IOException;
+  void fatalError(TaskAttemptID taskId, String message, JvmContext jvmContext)
+      throws IOException;
   
   /** Called by a reduce task to get the map output locations for finished maps.
    * Returns an update centered around the map-task-completion-events. 
@@ -154,7 +166,8 @@ public interface TaskUmbilicalProtocol e
   MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, 
                                                        int fromIndex, 
                                                        int maxLocs,
-                                                       TaskAttemptID id) 
+                                                       TaskAttemptID id,
+                                                       JvmContext jvmContext) 
   throws IOException;
 
   /**

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Apr 12 01:14:44 2011
@@ -581,7 +581,7 @@ public class MiniMRCluster {
       new TaskAttemptID(jtId, jobId.getId(), false, 0, 0);
     return taskTrackerList.get(index).getTaskTracker()
                                      .getMapCompletionEvents(jobId, 0, max, 
-                                                             dummy);
+                                                             dummy, null);
   }
   
   /**

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskCommit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskCommit.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskCommit.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskCommit.java Tue Apr 12 01:14:44 2011
@@ -99,32 +99,37 @@ public class TestTaskCommit extends Hado
     boolean taskDone = false;
 
     @Override
-    public boolean canCommit(TaskAttemptID taskid) throws IOException {
+    public boolean canCommit(TaskAttemptID taskid, JvmContext jvmContext)
+        throws IOException {
       return false;
     }
 
     @Override
-    public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus)
-        throws IOException, InterruptedException {
+    public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus,
+        JvmContext jvmContext) throws IOException, InterruptedException {
       fail("Task should not go to commit-pending");
     }
 
     @Override
-    public void done(TaskAttemptID taskid) throws IOException {
+    public void done(TaskAttemptID taskid, JvmContext jvmContext)
+        throws IOException {
       taskDone = true;
     }
 
     @Override
-    public void fatalError(TaskAttemptID taskId, String message)
-        throws IOException { }
+    public void fatalError(TaskAttemptID taskId, String message,
+        JvmContext jvmContext) throws IOException {
+    }
 
     @Override
-    public void fsError(TaskAttemptID taskId, String message)
-        throws IOException { }
+    public void fsError(TaskAttemptID taskId, String message,
+        JvmContext jvmContext) throws IOException {
+    }
 
     @Override
     public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
-        int fromIndex, int maxLocs, TaskAttemptID id) throws IOException {
+        int fromIndex, int maxLocs, TaskAttemptID id, JvmContext jvmContext)
+        throws IOException {
       return null;
     }
 
@@ -134,28 +139,29 @@ public class TestTaskCommit extends Hado
     }
 
     @Override
-    public boolean ping(TaskAttemptID taskid) throws IOException {
+    public boolean ping(TaskAttemptID taskid, JvmContext jvmContext)
+        throws IOException {
       return true;
     }
 
     @Override
-    public void reportDiagnosticInfo(TaskAttemptID taskid, String trace)
-        throws IOException {
+    public void reportDiagnosticInfo(TaskAttemptID taskid, String trace,
+        JvmContext jvmContext) throws IOException {
     }
 
     @Override
-    public void reportNextRecordRange(TaskAttemptID taskid, Range range)
-        throws IOException {
+    public void reportNextRecordRange(TaskAttemptID taskid, Range range,
+        JvmContext jvmContext) throws IOException {
     }
 
     @Override
-    public void shuffleError(TaskAttemptID taskId, String message)
-        throws IOException {
+    public void shuffleError(TaskAttemptID taskId, String message,
+        JvmContext jvmContext) throws IOException {
     }
 
     @Override
-    public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
-        throws IOException, InterruptedException {
+    public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus,
+        JvmContext jvmContext) throws IOException, InterruptedException {
       return true;
     }
 
@@ -166,9 +172,9 @@ public class TestTaskCommit extends Hado
     }
 
     @Override
-    public void 
-    updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
-                                       long[] sizes) throws IOException {
+    public void updatePrivateDistributedCacheSizes(
+        org.apache.hadoop.mapreduce.JobID jobId, long[] sizes)
+        throws IOException {
       // NOTHING
     }
   }

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java Tue Apr 12 01:14:44 2011
@@ -102,7 +102,7 @@ public class TestUmbilicalProtocolWithJo
           proxy = (TaskUmbilicalProtocol) RPC.getProxy(
               TaskUmbilicalProtocol.class, TaskUmbilicalProtocol.versionID,
               addr, conf);
-          proxy.ping(null);
+          proxy.ping(null, null);
         } finally {
           server.stop();
           if (proxy != null) {