You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/12/14 00:35:12 UTC

svn commit: r1213987 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/...

Author: vinodkv
Date: Tue Dec 13 23:35:11 2011
New Revision: 1213987

URL: http://svn.apache.org/viewvc?rev=1213987&view=rev
Log:
MAPREDUCE-3426. Fixed MR AM in uber mode to write map intermediate outputs in the correct directory to work properly in secure mode. Contributed by Hitesh Shah.

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Dec 13 23:35:11 2011
@@ -282,6 +282,10 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3542. Support "FileSystemCounter" legacy counter group name for
     compatibility. (tomwhite)
 
+    MAPREDUCE-3426. Fixed MR AM in uber mode to write map intermediate outputs
+    in the correct directory to work properly in secure mode. (Hitesh Shah via
+    vinodkv)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Tue Dec 13 23:35:11 2011
@@ -22,20 +22,19 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
-import java.net.URI;
 import java.util.HashSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
@@ -47,13 +46,12 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.service.AbstractService;
 
 /**
@@ -80,7 +78,10 @@ public class LocalContainerLauncher exte
     super(LocalContainerLauncher.class.getName());
     this.context = context;
     this.umbilical = umbilical;
-        // umbilical:  MRAppMaster creates (taskAttemptListener), passes to us  (TODO/FIXME:  pointless to use RPC to talk to self; should create LocalTaskAttemptListener or similar:  implement umbilical protocol but skip RPC stuff)
+        // umbilical:  MRAppMaster creates (taskAttemptListener), passes to us
+        // (TODO/FIXME:  pointless to use RPC to talk to self; should create
+        // LocalTaskAttemptListener or similar:  implement umbilical protocol
+        // but skip RPC stuff)
 
     try {
       curFC = FileContext.getFileContext(curDir.toURI());
@@ -152,7 +153,6 @@ public class LocalContainerLauncher exte
    *     ]]
    *   - runs Task (runSubMap() or runSubReduce())
    *     - TA can safely send TA_UPDATE since in RUNNING state
-   *       [modulo possible TA-state-machine race noted below:  CHECK (TODO)]
    */
   private class SubtaskRunner implements Runnable {
 
@@ -162,6 +162,7 @@ public class LocalContainerLauncher exte
     SubtaskRunner() {
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void run() {
       ContainerLauncherEvent event = null;
@@ -183,7 +184,7 @@ public class LocalContainerLauncher exte
 
           ContainerRemoteLaunchEvent launchEv =
               (ContainerRemoteLaunchEvent)event;
-          TaskAttemptId attemptID = launchEv.getTaskAttemptID(); //FIXME:  can attemptID ever be null?  (only if retrieved over umbilical?)
+          TaskAttemptId attemptID = launchEv.getTaskAttemptID(); 
 
           Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
           int numMapTasks = job.getTotalMaps();
@@ -204,7 +205,6 @@ public class LocalContainerLauncher exte
           // port number is set to -1 in this case.
           context.getEventHandler().handle(
               new TaskAttemptContainerLaunchedEvent(attemptID, -1));
-          //FIXME:  race condition here?  or do we have same kind of lock on TA handler => MapTask can't send TA_UPDATE before TA_CONTAINER_LAUNCHED moves TA to RUNNING state?  (probably latter)
 
           if (numMapTasks == 0) {
             doneWithMaps = true;
@@ -259,6 +259,7 @@ public class LocalContainerLauncher exte
       }
     }
 
+    @SuppressWarnings("deprecation")
     private void runSubtask(org.apache.hadoop.mapred.Task task,
                             final TaskType taskType,
                             TaskAttemptId attemptID,
@@ -270,6 +271,19 @@ public class LocalContainerLauncher exte
 
       try {
         JobConf conf = new JobConf(getConfig());
+        conf.set(JobContext.TASK_ID, task.getTaskID().toString());
+        conf.set(JobContext.TASK_ATTEMPT_ID, classicAttemptID.toString());
+        conf.setBoolean(JobContext.TASK_ISMAP, (taskType == TaskType.MAP));
+        conf.setInt(JobContext.TASK_PARTITION, task.getPartition());
+        conf.set(JobContext.ID, task.getJobID().toString());
+
+        // Use the AM's local dir env to generate the intermediate step 
+        // output files
+        String[] localSysDirs = StringUtils.getTrimmedStrings(
+            System.getenv(ApplicationConstants.LOCAL_DIR_ENV));
+        conf.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
+        LOG.info(MRConfig.LOCAL_DIR + " for uber task: "
+            + conf.get(MRConfig.LOCAL_DIR));
 
         // mark this as an uberized subtask so it can set task counter
         // (longer-term/FIXME:  could redefine as job counter and send
@@ -285,12 +299,12 @@ public class LocalContainerLauncher exte
           if (doneWithMaps) {
             LOG.error("CONTAINER_REMOTE_LAUNCH contains a map task ("
                       + attemptID + "), but should be finished with maps");
-            // throw new RuntimeException()  (FIXME: what's appropriate here?)
+            throw new RuntimeException();
           }
 
           MapTask map = (MapTask)task;
+          map.setConf(conf);
 
-          //CODE-REVIEWER QUESTION: why not task.getConf() or map.getConf() instead of conf? do we need Task's localizeConfiguration() run on this first?
           map.run(conf, umbilical);
 
           if (renameOutputs) {
@@ -305,19 +319,23 @@ public class LocalContainerLauncher exte
         } else /* TaskType.REDUCE */ {
 
           if (!doneWithMaps) {
-            //check if event-queue empty?  whole idea of counting maps vs. checking event queue is a tad wacky...but could enforce ordering (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?):  doesn't send reduce event until maps all done]
+            // check if event-queue empty?  whole idea of counting maps vs. 
+            // checking event queue is a tad wacky...but could enforce ordering
+            // (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?): 
+            // doesn't send reduce event until maps all done]
             LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task ("
                       + attemptID + "), but not yet finished with maps");
-            // throw new RuntimeException()  (FIXME) // or push reduce event back onto end of queue? (probably former)
+            throw new RuntimeException();
           }
 
-          ReduceTask reduce = (ReduceTask)task;
-
           // a.k.a. "mapreduce.jobtracker.address" in LocalJobRunner:
           // set framework name to local to make task local
           conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
           conf.set(MRConfig.MASTER_ADDRESS, "local");  // bypass shuffle
 
+          ReduceTask reduce = (ReduceTask)task;
+          reduce.setConf(conf);          
+
           reduce.run(conf, umbilical);
           //relocalize();  // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?)
         }
@@ -334,18 +352,7 @@ public class LocalContainerLauncher exte
         try {
           if (task != null) {
             // do cleanup for the task
-//          if (childUGI == null) { // no need to job into doAs block
-              task.taskCleanup(umbilical);
-//          } else {
-//            final Task taskFinal = task;
-//            childUGI.doAs(new PrivilegedExceptionAction<Object>() {
-//              @Override
-//              public Object run() throws Exception {
-//                taskFinal.taskCleanup(umbilical);
-//                return null;
-//              }
-//            });
-//          }
+            task.taskCleanup(umbilical);
           }
         } catch (Exception e) {
           LOG.info("Exception cleaning up: "
@@ -354,51 +361,21 @@ public class LocalContainerLauncher exte
         // Report back any failures, for diagnostic purposes
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         exception.printStackTrace(new PrintStream(baos));
-//      if (classicAttemptID != null) {
-          umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString());
-//      }
+        umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString());
         throw new RuntimeException();
 
       } catch (Throwable throwable) {
         LOG.fatal("Error running local (uberized) 'child' : "
             + StringUtils.stringifyException(throwable));
-//      if (classicAttemptID != null) {
-          Throwable tCause = throwable.getCause();
-          String cause = (tCause == null)
-              ? throwable.getMessage()
-              : StringUtils.stringifyException(tCause);
-          umbilical.fatalError(classicAttemptID, cause);
-//      }
+        Throwable tCause = throwable.getCause();
+        String cause = (tCause == null)
+            ? throwable.getMessage()
+                : StringUtils.stringifyException(tCause);
+            umbilical.fatalError(classicAttemptID, cause);
         throw new RuntimeException();
-
-      } finally {
-/*
-FIXME:  do we need to do any of this stuff?  (guessing not since not in own JVM)
-        RPC.stopProxy(umbilical);
-        DefaultMetricsSystem.shutdown();
-        // Shutting down log4j of the child-vm...
-        // This assumes that on return from Task.run()
-        // there is no more logging done.
-        LogManager.shutdown();
- */
       }
     }
 
-
-/* FIXME:  may not need renameMapOutputForReduce() anymore?  TEST!
-
-${local.dir}/usercache/$user/appcache/$appId/$contId/ == $cwd for containers;
-contains launch_container.sh script, which, when executed, creates symlinks and 
-sets up env
- "$local.dir"/usercache/$user/appcache/$appId/$contId/file.out
- "$local.dir"/usercache/$user/appcache/$appId/$contId/file.out.idx (?)
- "$local.dir"/usercache/$user/appcache/$appId/output/$taskId/ is where file.out* is moved after MapTask done
-
-	OHO!  no further need for this at all?  $taskId is unique per subtask
-	now => should work fine to leave alone.  TODO:  test with teragen or
-	similar
- */
-
     /**
      * Within the _local_ filesystem (not HDFS), all activity takes place within
      * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
@@ -409,14 +386,21 @@ sets up env
      * filenames instead of "file.out". (All of this is entirely internal,
      * so there are no particular compatibility issues.)
      */
+    @SuppressWarnings("deprecation")
     private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId,
                                           MapOutputFile subMapOutputFile)
     throws IOException {
       FileSystem localFs = FileSystem.getLocal(conf);
       // move map output to reduce input
       Path mapOut = subMapOutputFile.getOutputFile();
+      FileStatus mStatus = localFs.getFileStatus(mapOut);      
       Path reduceIn = subMapOutputFile.getInputFileForWrite(
-          TypeConverter.fromYarn(mapId).getTaskID(), localFs.getLength(mapOut));
+          TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Renaming map output file for task attempt "
+            + mapId.toString() + " from original location " + mapOut.toString()
+            + " to destination " + reduceIn.toString());
+      }
       if (!localFs.mkdirs(reduceIn.getParent())) {
         throw new IOException("Mkdirs failed to create "
             + reduceIn.getParent().toString());
@@ -429,8 +413,7 @@ sets up env
      * Also within the local filesystem, we need to restore the initial state
      * of the directory as much as possible.  Compare current contents against
      * the saved original state and nuke everything that doesn't belong, with
-     * the exception of the renamed map outputs (see above).
-FIXME:  do we really need to worry about renamed map outputs, or already moved to output dir on commit?  if latter, fix comment
+     * the exception of the renamed map outputs.
      *
      * Any jobs that go out of their way to rename or delete things from the
      * local directory are considered broken and deserve what they get...

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java Tue Dec 13 23:35:11 2011
@@ -236,6 +236,13 @@ public class MapReduceChildJVM {
                 getTaskLogFile(TaskLog.LogName.PROFILE)
                 )
             );
+        if (task.isMapTask()) {
+          vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, ""));
+        }
+        else {
+          vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, ""));
+        }
+        
       }
     }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Dec 13 23:35:11 2011
@@ -156,6 +156,7 @@ public class MRAppMaster extends Composi
   private OutputCommitter committer;
   private JobEventDispatcher jobEventDispatcher;
   private boolean inRecovery = false;
+  private SpeculatorEventDispatcher speculatorEventDispatcher;
 
   private Job job;
   private Credentials fsTokens = new Credentials(); // Filled during init
@@ -265,8 +266,9 @@ public class MRAppMaster extends Composi
       addIfService(speculator);
     }
 
+    speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
     dispatcher.register(Speculator.EventType.class,
-        new SpeculatorEventDispatcher(conf));
+        speculatorEventDispatcher);
 
     // service to allocate containers from RM (if non-uber) or to fake it (uber)
     containerAllocator = createContainerAllocator(clientService, context);
@@ -386,7 +388,7 @@ public class MRAppMaster extends Composi
         // This will also send the final report to the ResourceManager
         LOG.info("Calling stop for all the services");
         stop();
-        
+
         // Send job-end notification
         try {
           LOG.info("Job end notification started for jobID : "
@@ -401,14 +403,14 @@ public class MRAppMaster extends Composi
       } catch (Throwable t) {
         LOG.warn("Graceful stop failed ", t);
       }
-      
+
       // Cleanup staging directory
       try {
         cleanupStagingDir();
       } catch(IOException io) {
         LOG.warn("Failed to delete staging dir");
       }
-      
+
       //Bring the process down by force.
       //Not needed after HADOOP-7140
       LOG.info("Exiting MR AppMaster..GoodBye!");
@@ -790,10 +792,6 @@ public class MRAppMaster extends Composi
     // job-init to be done completely here.
     jobEventDispatcher.handle(initJobEvent);
 
-    // send init to speculator. This won't yest start as dispatcher isn't
-    // started yet.
-    dispatcher.getEventHandler().handle(
-        new SpeculatorEvent(job.getID(), clock.getTime()));
 
     // JobImpl's InitTransition is done (call above is synchronous), so the
     // "uber-decision" (MR-1220) has been made.  Query job and switch to
@@ -801,9 +799,15 @@ public class MRAppMaster extends Composi
     // and container-launcher services/event-handlers).
 
     if (job.isUber()) {
+      speculatorEventDispatcher.disableSpeculation();
       LOG.info("MRAppMaster uberizing job " + job.getID()
-               + " in local container (\"uber-AM\").");
+               + " in local container (\"uber-AM\") on node "
+               + nmHost + ":" + nmPort + ".");
     } else {
+      // send init to speculator only for non-uber jobs. 
+      // This won't yet start as dispatcher isn't started yet.
+      dispatcher.getEventHandler().handle(
+          new SpeculatorEvent(job.getID(), clock.getTime()));
       LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
                + "job " + job.getID() + ".");
     }
@@ -865,17 +869,24 @@ public class MRAppMaster extends Composi
   private class SpeculatorEventDispatcher implements
       EventHandler<SpeculatorEvent> {
     private final Configuration conf;
+    private volatile boolean disabled;
     public SpeculatorEventDispatcher(Configuration config) {
       this.conf = config;
     }
     @Override
     public void handle(SpeculatorEvent event) {
-      if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
-          || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
+      if (!disabled && 
+          (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
+          || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false))) {
         // Speculator IS enabled, direct the event to there.
         speculator.handle(event);
       }
     }
+
+    public void disableSpeculation() {
+      disabled = true;
+    }
+
   }
 
   private static void validateInputParam(String value, String param)

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Dec 13 23:35:11 2011
@@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
 import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
 import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -583,13 +584,13 @@ public class JobImpl implements org.apac
       if (getState() == JobState.NEW) {
         return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
             appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
-            cleanupProgress, remoteJobConfFile.toString(), amInfos);
+            cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
       }
 
       return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
           appSubmitTime, startTime, finishTime, setupProgress,
           computeProgress(mapTasks), computeProgress(reduceTasks),
-          cleanupProgress, remoteJobConfFile.toString(), amInfos);
+          cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
     } finally {
       readLock.unlock();
     }
@@ -812,6 +813,129 @@ public class JobImpl implements org.apac
     return amInfos;
   }
 
+  /**
+   * Decide whether job can be run in uber mode based on various criteria.
+   * @param dataInputLength Total length for all splits
+   */
+  private void makeUberDecision(long dataInputLength) {
+    //FIXME:  need new memory criterion for uber-decision (oops, too late here;
+    // until AM-resizing supported,
+    // must depend on job client to pass fat-slot needs)
+    // these are no longer "system" settings, necessarily; user may override
+    int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
+
+    //FIXME: handling multiple reduces within a single AM does not seem to
+    //work.
+    // int sysMaxReduces =
+    //     job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
+    int sysMaxReduces = 1;
+
+    long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
+        conf.getLong("dfs.block.size", 64*1024*1024));  //FIXME: this is
+    // wrong; get FS from [File?]InputFormat and default block size from that
+
+    long sysMemSizeForUberSlot =
+        conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
+            MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
+
+    boolean uberEnabled =
+        conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps);
+    boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);
+    boolean smallInput = (dataInputLength <= sysMaxBytes);
+    // ignoring overhead due to UberAM and statics as negligible here:
+    boolean smallMemory =
+        ( (Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0),
+            conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
+            <= sysMemSizeForUberSlot)
+            || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
+    boolean notChainJob = !isChainJob(conf);
+
+    // User has overall veto power over uberization, or user can modify
+    // limits (overriding system settings and potentially shooting
+    // themselves in the head).  Note that ChainMapper/Reducer are
+    // fundamentally incompatible with MR-1220; they employ a blocking
+    // queue between the maps/reduces and thus require parallel execution,
+    // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
+    // and thus requires sequential execution.
+    isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
+        && smallInput && smallMemory && notChainJob;
+
+    if (isUber) {
+      LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
+          + numReduceTasks + "r tasks (" + dataInputLength
+          + " input bytes) will run sequentially on single node.");
+
+      // make sure reduces are scheduled only after all map are completed
+      conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
+                        1.0f);
+      // uber-subtask attempts all get launched on same node; if one fails,
+      // probably should retry elsewhere, i.e., move entire uber-AM:  ergo,
+      // limit attempts to 1 (or at most 2?  probably not...)
+      conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+      conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+
+      // disable speculation
+      conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+      conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+    } else {
+      StringBuilder msg = new StringBuilder();
+      msg.append("Not uberizing ").append(jobId).append(" because:");
+      if (!uberEnabled)
+        msg.append(" not enabled;");
+      if (!smallNumMapTasks)
+        msg.append(" too many maps;");
+      if (!smallNumReduceTasks)
+        msg.append(" too many reduces;");
+      if (!smallInput)
+        msg.append(" too much input;");
+      if (!smallMemory)
+        msg.append(" too much RAM;");
+      if (!notChainJob)
+        msg.append(" chainjob");
+      LOG.info(msg.toString());
+    }
+  }
+
+  /**
+   * ChainMapper and ChainReducer must execute in parallel, so they're not
+   * compatible with uberization/LocalContainerLauncher (100% sequential).
+   */
+  private boolean isChainJob(Configuration conf) {
+    boolean isChainJob = false;
+    try {
+      String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
+      if (mapClassName != null) {
+        Class<?> mapClass = Class.forName(mapClassName);
+        if (ChainMapper.class.isAssignableFrom(mapClass))
+          isChainJob = true;
+      }
+    } catch (ClassNotFoundException cnfe) {
+      // don't care; assume it's not derived from ChainMapper
+    }
+    try {
+      String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
+      if (reduceClassName != null) {
+        Class<?> reduceClass = Class.forName(reduceClassName);
+        if (ChainReducer.class.isAssignableFrom(reduceClass))
+          isChainJob = true;
+      }
+    } catch (ClassNotFoundException cnfe) {
+      // don't care; assume it's not derived from ChainReducer
+    }
+    return isChainJob;
+  }
+
+  /*
+  private int getBlockSize() {
+    String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR);
+    if (inputClassName != null) {
+      Class<?> inputClass - Class.forName(inputClassName);
+      if (FileInputFormat<K, V>)
+    }
+  }
+  */
+
   public static class InitTransition 
       implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
 
@@ -863,81 +987,8 @@ public class JobImpl implements org.apac
           inputLength += taskSplitMetaInfo[i].getInputDataLength();
         }
 
-        //FIXME:  need new memory criterion for uber-decision (oops, too late here; 
-        // until AM-resizing supported, must depend on job client to pass fat-slot needs)
-        // these are no longer "system" settings, necessarily; user may override
-        int sysMaxMaps = job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
-        int sysMaxReduces =
-            job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
-        long sysMaxBytes = job.conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
-            job.conf.getLong("dfs.block.size", 64*1024*1024));  //FIXME: this is 
-        // wrong; get FS from [File?]InputFormat and default block size from that
-        //long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot(); 
-        // FIXME [could use default AM-container memory size...]
-
-        boolean uberEnabled =
-            job.conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
-        boolean smallNumMapTasks = (job.numMapTasks <= sysMaxMaps);
-        boolean smallNumReduceTasks = (job.numReduceTasks <= sysMaxReduces);
-        boolean smallInput = (inputLength <= sysMaxBytes);
-        boolean smallMemory = true;  //FIXME (see above)
-            // ignoring overhead due to UberTask and statics as negligible here:
-        //  FIXME   && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
-        //              || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)
-        boolean notChainJob = !isChainJob(job.conf);
-
-        // User has overall veto power over uberization, or user can modify
-        // limits (overriding system settings and potentially shooting
-        // themselves in the head).  Note that ChainMapper/Reducer are
-        // fundamentally incompatible with MR-1220; they employ a blocking
-
-        // User has overall veto power over uberization, or user can modify
-        // limits (overriding system settings and potentially shooting
-        // themselves in the head).  Note that ChainMapper/Reducer are
-        // fundamentally incompatible with MR-1220; they employ a blocking
-        // queue between the maps/reduces and thus require parallel execution,
-        // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
-        // and thus requires sequential execution.
-        job.isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
-            && smallInput && smallMemory && notChainJob;
-
-        if (job.isUber) {
-          LOG.info("Uberizing job " + job.jobId + ": " + job.numMapTasks + "m+"
-              + job.numReduceTasks + "r tasks (" + inputLength
-              + " input bytes) will run sequentially on single node.");
-              //TODO: also note which node?
-
-          // make sure reduces are scheduled only after all map are completed
-          job.conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
-                            1.0f);
-          // uber-subtask attempts all get launched on same node; if one fails,
-          // probably should retry elsewhere, i.e., move entire uber-AM:  ergo,
-          // limit attempts to 1 (or at most 2?  probably not...)
-          job.conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
-          job.conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
-
-          // disable speculation:  makes no sense to speculate an entire job
-          //canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old 
-          //version, ultimately was from conf.getMapSpeculativeExecution(), 
-          //conf.getReduceSpeculativeExecution()]
-        } else {
-          StringBuilder msg = new StringBuilder();
-          msg.append("Not uberizing ").append(job.jobId).append(" because:");
-          if (!uberEnabled)
-            msg.append(" not enabled;");
-          if (!smallNumMapTasks)
-            msg.append(" too many maps;");
-          if (!smallNumReduceTasks)
-            msg.append(" too many reduces;");
-          if (!smallInput)
-            msg.append(" too much input;");
-          if (!smallMemory)
-            msg.append(" too much RAM;");
-          if (!notChainJob)
-            msg.append(" chainjob");
-          LOG.info(msg.toString());
-        }
-
+        job.makeUberDecision(inputLength);
+        
         job.taskAttemptCompletionEvents =
             new ArrayList<TaskAttemptCompletionEvent>(
                 job.numMapTasks + job.numReduceTasks + 10);
@@ -1008,35 +1059,6 @@ public class JobImpl implements org.apac
       }
     }
 
-    /**
-     * ChainMapper and ChainReducer must execute in parallel, so they're not
-     * compatible with uberization/LocalContainerLauncher (100% sequential).
-     */
-    boolean isChainJob(Configuration conf) {
-      boolean isChainJob = false;
-      try {
-        String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
-        if (mapClassName != null) {
-          Class<?> mapClass = Class.forName(mapClassName);
-          if (ChainMapper.class.isAssignableFrom(mapClass))
-            isChainJob = true;
-        }
-      } catch (ClassNotFoundException cnfe) {
-        // don't care; assume it's not derived from ChainMapper
-      }
-      try {
-        String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
-        if (reduceClassName != null) {
-          Class<?> reduceClass = Class.forName(reduceClassName);
-          if (ChainReducer.class.isAssignableFrom(reduceClass))
-            isChainJob = true;
-        }
-      } catch (ClassNotFoundException cnfe) {
-        // don't care; assume it's not derived from ChainReducer
-      }
-      return isChainJob;
-    }
-
     private void createMapTasks(JobImpl job, long inputLength,
                                 TaskSplitMetaInfo[] splits) {
       for (int i=0; i < job.numMapTasks; ++i) {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Tue Dec 13 23:35:11 2011
@@ -60,8 +60,8 @@ public class LocalContainerAllocator ext
   private static final Log LOG =
       LogFactory.getLog(LocalContainerAllocator.class);
 
+  @SuppressWarnings("rawtypes")
   private final EventHandler eventHandler;
-//  private final ApplicationId appID;
   private AtomicInteger containerCount = new AtomicInteger();
   private long retryInterval;
   private long retrystartTime;
@@ -73,8 +73,6 @@ public class LocalContainerAllocator ext
                                  AppContext context) {
     super(clientService, context);
     this.eventHandler = context.getEventHandler();
-//    this.appID = context.getApplicationID();
-    
   }
 
   @Override
@@ -88,6 +86,7 @@ public class LocalContainerAllocator ext
     retrystartTime = System.currentTimeMillis();
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   protected synchronized void heartbeat() throws Exception {
     AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
@@ -124,6 +123,7 @@ public class LocalContainerAllocator ext
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void handle(ContainerAllocatorEvent event) {
     if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Tue Dec 13 23:35:11 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.io.IOException;
 import java.security.PrivilegedAction;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -65,7 +66,7 @@ public abstract class RMCommunicator ext
   private int rmPollInterval;//millis
   protected ApplicationId applicationId;
   protected ApplicationAttemptId applicationAttemptId;
-  private volatile boolean stopped;
+  private AtomicBoolean stopped;
   protected Thread allocatorThread;
   protected EventHandler eventHandler;
   protected AMRMProtocol scheduler;
@@ -88,6 +89,7 @@ public abstract class RMCommunicator ext
     this.eventHandler = context.getEventHandler();
     this.applicationId = context.getApplicationID();
     this.applicationAttemptId = context.getApplicationAttemptId();
+    this.stopped = new AtomicBoolean(false);
   }
 
   @Override
@@ -213,7 +215,10 @@ public abstract class RMCommunicator ext
 
   @Override
   public void stop() {
-    stopped = true;
+    if (stopped.getAndSet(true)) {
+      // return if already stopped
+      return;
+    }
     allocatorThread.interrupt();
     try {
       allocatorThread.join();
@@ -228,7 +233,7 @@ public abstract class RMCommunicator ext
     allocatorThread = new Thread(new Runnable() {
       @Override
       public void run() {
-        while (!stopped && !Thread.currentThread().isInterrupted()) {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
           try {
             Thread.sleep(rmPollInterval);
             try {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java Tue Dec 13 23:35:11 2011
@@ -183,6 +183,7 @@ public class TestMRClientService {
     Assert.assertEquals(1, amInfo.getContainerId().getApplicationAttemptId()
         .getAttemptId());
     Assert.assertTrue(amInfo.getStartTime() > 0);
+    Assert.assertEquals(false, jr.isUber());
   }
   
   private void verifyTaskAttemptReport(TaskAttemptReport tar) {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Tue Dec 13 23:35:11 2011
@@ -118,7 +118,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 
-            0, 0, 0, 0, 0, 0, "jobfile", null));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 
@@ -195,7 +195,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
-            0, 0, 0, 0, 0, 0, "jobfile", null));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 
@@ -261,7 +261,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
-            0, 0, 0, 0, 0, 0, "jobfile", null));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 
@@ -375,7 +375,7 @@ public class TestRMContainerAllocator {
     public JobReport getReport() {
       return MRBuilderUtils.newJobReport(this.jobId, "job", "user",
           JobState.RUNNING, 0, 0, 0, this.setupProgress, this.mapProgress,
-          this.reduceProgress, this.cleanupProgress, "jobfile", null);
+          this.reduceProgress, this.cleanupProgress, "jobfile", null, false);
     }
   }
 
@@ -511,7 +511,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
-            0, 0, 0, 0, 0, 0, "jobfile", null));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 
@@ -610,7 +610,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
-            0, 0, 0, 0, 0, 0, "jobfile", null));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Tue Dec 13 23:35:11 2011
@@ -288,7 +288,7 @@ public class TypeConverter {
             .getMapProgress(), jobreport.getReduceProgress(), jobreport
             .getCleanupProgress(), fromYarn(jobreport.getJobState()),
         jobPriority, jobreport.getUser(), jobreport.getJobName(), jobreport
-            .getJobFile(), trackingUrl);
+            .getJobFile(), trackingUrl, jobreport.isUber());
     jobStatus.setFailureInfo(jobreport.getDiagnostics());
     return jobStatus;
   }
@@ -421,7 +421,7 @@ public class TypeConverter {
           TypeConverter.fromYarn(application.getYarnApplicationState()),
           org.apache.hadoop.mapreduce.JobPriority.NORMAL,
           application.getUser(), application.getName(),
-          application.getQueue(), jobFile, trackingUrl
+          application.getQueue(), jobFile, trackingUrl, false
       );
     jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url
     jobStatus.setStartTime(application.getStartTime());

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java Tue Dec 13 23:35:11 2011
@@ -36,6 +36,7 @@ public interface JobReport {
   public abstract String getDiagnostics();
   public abstract String getJobFile();
   public abstract List<AMInfo> getAMInfos();
+  public abstract boolean isUber();
 
   public abstract void setJobId(JobId jobId);
   public abstract void setJobState(JobState jobState);
@@ -52,4 +53,5 @@ public interface JobReport {
   public abstract void setDiagnostics(String diagnostics);
   public abstract void setJobFile(String jobFile);
   public abstract void setAMInfos(List<AMInfo> amInfos);
+  public abstract void setIsUber(boolean isUber);
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java Tue Dec 13 23:35:11 2011
@@ -332,4 +332,16 @@ public class JobReportPBImpl extends Pro
   private JobState convertFromProtoFormat(JobStateProto e) {
     return MRProtoUtils.convertFromProtoFormat(e);
   }
+
+  @Override
+  public synchronized boolean isUber() {
+    JobReportProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getIsUber();
+  }
+
+  @Override
+  public synchronized void setIsUber(boolean isUber) {
+    maybeInitBuilder();
+    builder.setIsUber(isUber);
+  }
 }  

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java Tue Dec 13 23:35:11 2011
@@ -60,7 +60,8 @@ public class MRBuilderUtils {
   public static JobReport newJobReport(JobId jobId, String jobName,
       String userName, JobState state, long submitTime, long startTime, long finishTime,
       float setupProgress, float mapProgress, float reduceProgress,
-      float cleanupProgress, String jobFile, List<AMInfo> amInfos) {
+      float cleanupProgress, String jobFile, List<AMInfo> amInfos,
+      boolean isUber) {
     JobReport report = Records.newRecord(JobReport.class);
     report.setJobId(jobId);
     report.setJobName(jobName);
@@ -75,6 +76,7 @@ public class MRBuilderUtils {
     report.setReduceProgress(reduceProgress);
     report.setJobFile(jobFile);
     report.setAMInfos(amInfos);
+    report.setIsUber(isUber);
     return report;
   }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto Tue Dec 13 23:35:11 2011
@@ -152,6 +152,7 @@ message JobReportProto {
   optional string jobFile = 13;
   repeated AMInfoProto am_infos = 14;
   optional int64 submit_time = 15;
+  optional bool is_uber = 16 [default = false];
 }
 
 message AMInfoProto {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java Tue Dec 13 23:35:11 2011
@@ -97,7 +97,7 @@ public class JobStatus extends org.apach
                    String user, String jobName, 
                    String jobFile, String trackingUrl) {
     this(jobid, mapProgress, reduceProgress, cleanupProgress, runState,
-                  JobPriority.NORMAL, user, jobName, jobFile, trackingUrl);
+        JobPriority.NORMAL, user, jobName, jobFile, trackingUrl);
   }
 
   /**
@@ -135,7 +135,8 @@ public class JobStatus extends org.apach
                       String user, String jobName, String jobFile, 
                       String trackingUrl) {
      this(jobid, 0.0f, mapProgress, reduceProgress, 
-          cleanupProgress, runState, jp, user, jobName, jobFile, trackingUrl);
+          cleanupProgress, runState, jp, user, jobName, jobFile,
+          trackingUrl);
    }
    
   /**
@@ -157,9 +158,31 @@ public class JobStatus extends org.apach
                     int runState, JobPriority jp, String user, String jobName, 
                     String jobFile, String trackingUrl) {
      this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
-         runState, jp,
-         user, jobName, "default", jobFile, trackingUrl);
+         runState, jp, user, jobName, "default", jobFile, trackingUrl);
    }
+
+   /**
+    * Create a job status object for a given jobid.
+    * @param jobid The jobid of the job
+    * @param setupProgress The progress made on the setup
+    * @param mapProgress The progress made on the maps
+    * @param reduceProgress The progress made on the reduces
+    * @param cleanupProgress The progress made on the cleanup
+    * @param runState The current state of the job
+    * @param jp Priority of the job.
+    * @param user userid of the person who submitted the job.
+    * @param jobName user-specified job name.
+    * @param jobFile job configuration file. 
+    * @param trackingUrl link to the web-ui for details of the job.
+    * @param isUber Whether job running in uber mode
+    */
+    public JobStatus(JobID jobid, float setupProgress, float mapProgress,
+                     float reduceProgress, float cleanupProgress, 
+                     int runState, JobPriority jp, String user, String jobName, 
+                     String jobFile, String trackingUrl, boolean isUber) {
+      this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
+          runState, jp, user, jobName, "default", jobFile, trackingUrl, isUber);
+    }   
    
    /**
     * Create a job status object for a given jobid.
@@ -173,27 +196,52 @@ public class JobStatus extends org.apach
     * @param user userid of the person who submitted the job.
     * @param jobName user-specified job name.
     * @param queue job queue name.
+    * @param jobFile job configuration file.
+    * @param trackingUrl link to the web-ui for details of the job.
+    */
+   public JobStatus(JobID jobid, float setupProgress, float mapProgress,
+       float reduceProgress, float cleanupProgress,
+       int runState, JobPriority jp,
+       String user, String jobName, String queue,
+       String jobFile, String trackingUrl) {
+     this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
+         runState, jp,
+         user, jobName, queue, jobFile, trackingUrl, false);
+   }
+
+   /**
+    * Create a job status object for a given jobid.
+    * @param jobid The jobid of the job
+    * @param setupProgress The progress made on the setup
+    * @param mapProgress The progress made on the maps
+    * @param reduceProgress The progress made on the reduces
+    * @param cleanupProgress The progress made on the cleanup
+    * @param runState The current state of the job
+    * @param jp Priority of the job.
+    * @param user userid of the person who submitted the job.
+    * @param jobName user-specified job name.
+    * @param queue job queue name.
     * @param jobFile job configuration file. 
     * @param trackingUrl link to the web-ui for details of the job.
+    * @param isUber Whether job running in uber mode
     */
    public JobStatus(JobID jobid, float setupProgress, float mapProgress,
        float reduceProgress, float cleanupProgress, 
        int runState, JobPriority jp, 
        String user, String jobName, String queue, 
-       String jobFile, String trackingUrl) {
+       String jobFile, String trackingUrl, boolean isUber) {
      super(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
          getEnum(runState), org.apache.hadoop.mapreduce.JobPriority.valueOf(jp.name()),
-         user, jobName, queue, jobFile, trackingUrl);
+         user, jobName, queue, jobFile, trackingUrl, isUber);
    }
 
-
   public static JobStatus downgrade(org.apache.hadoop.mapreduce.JobStatus stat){
     JobStatus old = new JobStatus(JobID.downgrade(stat.getJobID()),
       stat.getSetupProgress(), stat.getMapProgress(), stat.getReduceProgress(),
       stat.getCleanupProgress(), stat.getState().getValue(), 
       JobPriority.valueOf(stat.getPriority().name()),
       stat.getUsername(), stat.getJobName(), stat.getJobFile(),
-      stat.getTrackingUrl());
+      stat.getTrackingUrl(), stat.isUber());
     old.setStartTime(stat.getStartTime());
     old.setFinishTime(stat.getFinishTime());
     old.setSchedulingInfo(stat.getSchedulingInfo());

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Tue Dec 13 23:35:11 2011
@@ -467,6 +467,7 @@ public class Job extends JobContextImpl 
     sb.append("Job File: ").append(status.getJobFile()).append("\n");
     sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
     sb.append("\n");
+    sb.append("Uber job : ").append(status.isUber()).append("\n");
     sb.append("map() completion: ");
     sb.append(status.getMapProgress()).append("\n");
     sb.append("reduce() completion: ");
@@ -1268,12 +1269,20 @@ public class Job extends JobContextImpl 
       Job.getProgressPollInterval(clientConf);
     /* make sure to report full progress after the job is done */
     boolean reportedAfterCompletion = false;
+    boolean reportedUberMode = false;
     while (!isComplete() || !reportedAfterCompletion) {
       if (isComplete()) {
         reportedAfterCompletion = true;
       } else {
         Thread.sleep(progMonitorPollIntervalMillis);
       }
+      if (status.getState() == JobStatus.State.PREP) {
+        continue;
+      }      
+      if (!reportedUberMode) {
+        reportedUberMode = true;
+        LOG.info("Job " + jobId + " running in uber mode : " + isUber());
+      }      
       String report = 
         (" map " + StringUtils.formatPercent(mapProgress(), 0)+
             " reduce " + 
@@ -1497,4 +1506,10 @@ public class Job extends JobContextImpl 
     conf.set(Job.OUTPUT_FILTER, newValue.toString());
   }
 
+  public boolean isUber() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status.isUber();
+  }
+  
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java Tue Dec 13 23:35:11 2011
@@ -97,7 +97,7 @@ public class JobStatus implements Writab
   private int usedMem;
   private int reservedMem;
   private int neededMem;
-
+  private boolean isUber;
     
   /**
    */
@@ -115,17 +115,17 @@ public class JobStatus implements Writab
    * @param jp Priority of the job.
    * @param user userid of the person who submitted the job.
    * @param jobName user-specified job name.
-   * @param jobFile job configuration file. 
+   * @param jobFile job configuration file.
    * @param trackingUrl link to the web-ui for details of the job.
    */
    public JobStatus(JobID jobid, float setupProgress, float mapProgress,
-                    float reduceProgress, float cleanupProgress, 
+                    float reduceProgress, float cleanupProgress,
                     State runState, JobPriority jp, String user, String jobName, 
                     String jobFile, String trackingUrl) {
      this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress, 
-         runState, jp, user, jobName, "default", jobFile, trackingUrl);
+         runState, jp, user, jobName, "default", jobFile, trackingUrl, false);
    }
-           
+
    /**
     * Create a job status object for a given jobid.
     * @param jobid The jobid of the job
@@ -138,14 +138,39 @@ public class JobStatus implements Writab
     * @param user userid of the person who submitted the job.
     * @param jobName user-specified job name.
     * @param queue queue name
-    * @param jobFile job configuration file. 
+    * @param jobFile job configuration file.
     * @param trackingUrl link to the web-ui for details of the job.
     */
     public JobStatus(JobID jobid, float setupProgress, float mapProgress,
-                     float reduceProgress, float cleanupProgress, 
-                     State runState, JobPriority jp, 
-                     String user, String jobName, String queue, 
+                     float reduceProgress, float cleanupProgress,
+                     State runState, JobPriority jp,
+                     String user, String jobName, String queue,
                      String jobFile, String trackingUrl) {
+      this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
+          runState, jp, user, jobName, queue, jobFile, trackingUrl, false);
+    }
+
+   /**
+    * Create a job status object for a given jobid.
+    * @param jobid The jobid of the job
+    * @param setupProgress The progress made on the setup
+    * @param mapProgress The progress made on the maps
+    * @param reduceProgress The progress made on the reduces
+    * @param cleanupProgress The progress made on the cleanup
+    * @param runState The current state of the job
+    * @param jp Priority of the job.
+    * @param user userid of the person who submitted the job.
+    * @param jobName user-specified job name.
+    * @param queue queue name
+    * @param jobFile job configuration file.
+    * @param trackingUrl link to the web-ui for details of the job.
+    * @param isUber Whether job running in uber mode
+    */
+    public JobStatus(JobID jobid, float setupProgress, float mapProgress,
+                     float reduceProgress, float cleanupProgress,
+                     State runState, JobPriority jp,
+                     String user, String jobName, String queue,
+                     String jobFile, String trackingUrl, boolean isUber) {
       this.jobid = jobid;
       this.setupProgress = setupProgress;
       this.mapProgress = mapProgress;
@@ -161,8 +186,9 @@ public class JobStatus implements Writab
       this.jobName = jobName;
       this.jobFile = jobFile;
       this.trackingUrl = trackingUrl;
+      this.isUber = isUber;
     }
-    
+
 
   /**
    * Sets the map progress of this job
@@ -411,6 +437,7 @@ public class JobStatus implements Writab
     Text.writeString(out, jobName);
     Text.writeString(out, trackingUrl);
     Text.writeString(out, jobFile);
+    out.writeBoolean(isUber);
 
     // Serialize the job's ACLs
     out.writeInt(jobACLs.size());
@@ -438,6 +465,7 @@ public class JobStatus implements Writab
     this.jobName = Text.readString(in);
     this.trackingUrl = Text.readString(in);
     this.jobFile = Text.readString(in);
+    this.isUber = in.readBoolean();
 
     // De-serialize the job's ACLs
     int numACLs = in.readInt();
@@ -562,9 +590,26 @@ public class JobStatus implements Writab
     this.neededMem = n;
   }
 
+  /**
+   * Whether job running in uber mode
+   * @return job in uber-mode
+   */
+  public synchronized boolean isUber() {
+    return isUber;
+  }
+  
+  /**
+   * Set uber-mode flag 
+   * @param isUber Whether job running in uber-mode
+   */
+  public synchronized void setUber(boolean isUber) {
+    this.isUber = isUber;
+  }
+  
   public String toString() {
     StringBuffer buffer = new StringBuffer();
     buffer.append("job-id : " + jobid);
+    buffer.append("uber-mode : " + isUber);
     buffer.append("map-progress : " + mapProgress);
     buffer.append("reduce-progress : " + reduceProgress);
     buffer.append("cleanup-progress : " + cleanupProgress);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Dec 13 23:35:11 2011
@@ -150,6 +150,10 @@ public interface MRJobConfig {
 
   public static final String NUM_REDUCE_PROFILES = "mapreduce.task.profile.reduces";
 
+  public static final String TASK_MAP_PROFILE_PARAMS = "mapreduce.task.profile.map.params";
+  
+  public static final String TASK_REDUCE_PROFILE_PARAMS = "mapreduce.task.profile.reduce.params";
+  
   public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
 
   public static final String TASK_ID = "mapreduce.task.id";
@@ -298,12 +302,6 @@ public interface MRJobConfig {
     "mapreduce.job.ubertask.maxreduces";
   public static final String JOB_UBERTASK_MAXBYTES =
     "mapreduce.job.ubertask.maxbytes";
-  public static final String UBERTASK_JAVA_OPTS =
-    "mapreduce.ubertask.child.java.opts";  // or mapreduce.uber.java.opts?
-  public static final String UBERTASK_ULIMIT =
-    "mapreduce.ubertask.child.ulimit";     // or mapreduce.uber.ulimit?
-  public static final String UBERTASK_ENV =
-    "mapreduce.ubertask.child.env";        // or mapreduce.uber.env?
 
   public static final String MR_PREFIX = "yarn.app.mapreduce.";
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java Tue Dec 13 23:35:11 2011
@@ -63,17 +63,20 @@ public class TestJobMonitorAndPrint exte
     when(cluster.getConf()).thenReturn(conf);
     when(cluster.getClient()).thenReturn(clientProtocol);
     JobStatus jobStatus = new JobStatus(new JobID("job_000", 1), 0f, 0f, 0f, 0f, 
-        State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url");
+        State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", 
+        "tmp-jobfile", "tmp-url");
     job = Job.getInstance(cluster, jobStatus, conf);
     job = spy(job);
   }
 
   @Test
   public void testJobMonitorAndPrint() throws Exception {
-    JobStatus jobStatus_1 = new JobStatus(new JobID("job_000", 1), 1f, 0.1f, 0.1f, 0f, 
-        State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url");
-    JobStatus jobStatus_2 = new JobStatus(new JobID("job_000", 1), 1f, 1f, 1f, 1f, 
-        State.SUCCEEDED, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url");
+    JobStatus jobStatus_1 = new JobStatus(new JobID("job_000", 1), 1f, 0.1f,
+        0.1f, 0f, State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname",
+        "tmp-queue", "tmp-jobfile", "tmp-url", true);
+    JobStatus jobStatus_2 = new JobStatus(new JobID("job_000", 1), 1f, 1f,
+        1f, 1f, State.SUCCEEDED, JobPriority.HIGH, "tmp-user", "tmp-jobname",
+        "tmp-queue", "tmp-jobfile", "tmp-url", true);
 
     doAnswer(
         new Answer<TaskCompletionEvent[]>() {
@@ -102,15 +105,21 @@ public class TestJobMonitorAndPrint exte
     String line;
     boolean foundHundred = false;
     boolean foundComplete = false;
-    String match_1 = "map 100% reduce 100%";
-    String match_2 = "completed successfully";
+    boolean foundUber = false;
+    String match_1 = "uber mode : true";
+    String match_2 = "map 100% reduce 100%";
+    String match_3 = "completed successfully";
     while ((line = r.readLine()) != null) {
-      foundHundred = line.contains(match_1);
+      if (line.contains(match_1)) {
+        foundUber = true;
+      }
+      foundHundred = line.contains(match_2);      
       if (foundHundred)
         break;
     }
     line = r.readLine();
-    foundComplete = line.contains(match_2);
+    foundComplete = line.contains(match_3);
+    assertTrue(foundUber);
     assertTrue(foundHundred);
     assertTrue(foundComplete);
   }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Tue Dec 13 23:35:11 2011
@@ -107,6 +107,7 @@ public class CompletedJob implements org
     report.setTrackingUrl(JobHistoryUtils.getHistoryUrl(conf, TypeConverter
         .toYarn(TypeConverter.fromYarn(jobId)).getAppId()));
     report.setAMInfos(getAMInfos());
+    report.setIsUber(isUber());
   }
 
   @Override

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Tue Dec 13 23:35:11 2011
@@ -168,7 +168,7 @@ public class TestClientServiceDelegate {
     GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
     when(jobReportResponse1.getJobReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user",
-            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null));
+            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false));
 
     // First AM returns a report with jobName firstGen and simulates AM shutdown
     // on second invocation.
@@ -180,7 +180,7 @@ public class TestClientServiceDelegate {
     GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class);
     when(jobReportResponse2.getJobReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user",
-            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null));
+            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false));
 
     // Second AM generation returns a report with jobName secondGen
     MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java Tue Dec 13 23:35:11 2011
@@ -49,6 +49,7 @@ public class TestUberAM extends TestMRJo
   }
 
   @Override
+  @Test
   public void testSleepJob()
   throws IOException, InterruptedException, ClassNotFoundException {
     if (mrCluster != null) {
@@ -84,6 +85,7 @@ public class TestUberAM extends TestMRJo
   }
 
   @Override
+  @Test
   public void testRandomWriter()
   throws IOException, InterruptedException, ClassNotFoundException {
     super.testRandomWriter();
@@ -101,6 +103,7 @@ public class TestUberAM extends TestMRJo
   }
 
   @Override
+  @Test
   public void testFailingMapper()
   throws IOException, InterruptedException, ClassNotFoundException {
     LOG.info("\n\n\nStarting uberized testFailingMapper().");