You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2011/11/02 06:35:03 UTC

svn commit: r1196458 [3/19] - in /hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ assembly/ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-cli...

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Nov  2 05:34:31 2011
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.security.PrivilegedExceptionAction;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -31,17 +33,27 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.LocalContainerLauncher;
 import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -70,17 +82,20 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -108,19 +123,27 @@ import org.apache.hadoop.yarn.util.Conve
  * The information is shared across different components using AppContext.
  */
 
+@SuppressWarnings("deprecation")
 public class MRAppMaster extends CompositeService {
 
   private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
 
   private Clock clock;
-  private final long startTime = System.currentTimeMillis();
+  private final long startTime;
+  private final long appSubmitTime;
   private String appName;
   private final ApplicationAttemptId appAttemptID;
+  private final ContainerId containerID;
+  private final String nmHost;
+  private final int nmPort;
+  private final int nmHttpPort;
   protected final MRAppMetrics metrics;
   private Set<TaskId> completedTasksFromPreviousRun;
+  private List<AMInfo> amInfos;
   private AppContext context;
   private Dispatcher dispatcher;
   private ClientService clientService;
+  private Recovery recoveryServ;
   private ContainerAllocator containerAllocator;
   private ContainerLauncher containerLauncher;
   private TaskCleaner taskCleaner;
@@ -128,39 +151,83 @@ public class MRAppMaster extends Composi
   private TaskAttemptListener taskAttemptListener;
   private JobTokenSecretManager jobTokenSecretManager =
       new JobTokenSecretManager();
+  private JobId jobId;
+  private boolean newApiCommitter;
+  private OutputCommitter committer;
   private JobEventDispatcher jobEventDispatcher;
+  private boolean inRecovery = false;
 
   private Job job;
-  
-  public MRAppMaster(ApplicationAttemptId applicationAttemptId) {
-    this(applicationAttemptId, new SystemClock());
+  private Credentials fsTokens = new Credentials(); // Filled during init
+  private UserGroupInformation currentUser; // Will be setup during init
+
+  public MRAppMaster(ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
+      long appSubmitTime) {
+    this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
+        new SystemClock(), appSubmitTime);
   }
 
-  public MRAppMaster(ApplicationAttemptId applicationAttemptId, Clock clock) {
+  public MRAppMaster(ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
+      Clock clock, long appSubmitTime) {
     super(MRAppMaster.class.getName());
     this.clock = clock;
+    this.startTime = clock.getTime();
+    this.appSubmitTime = appSubmitTime;
     this.appAttemptID = applicationAttemptId;
+    this.containerID = containerId;
+    this.nmHost = nmHost;
+    this.nmPort = nmPort;
+    this.nmHttpPort = nmHttpPort;
     this.metrics = MRAppMetrics.create();
     LOG.info("Created MRAppMaster for application " + applicationAttemptId);
   }
 
   @Override
   public void init(final Configuration conf) {
+
+    downloadTokensAndSetupUGI(conf);
+
     context = new RunningAppContext(conf);
 
     // Job name is the same as the app name util we support DAG of jobs
     // for an app later
     appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
 
-    if (conf.getBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false)
-         && appAttemptID.getAttemptId() > 1) {
-      LOG.info("Recovery is enabled. Will try to recover from previous life.");
-      Recovery recoveryServ = new RecoveryService(appAttemptID, clock);
+    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId());
+     
+    newApiCommitter = false;
+    jobId = MRBuilderUtils.newJobId(appAttemptID.getApplicationId(),
+        appAttemptID.getApplicationId().getId());
+    int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    if ((numReduceTasks > 0 && 
+        conf.getBoolean("mapred.reducer.new-api", false)) ||
+          (numReduceTasks == 0 && 
+           conf.getBoolean("mapred.mapper.new-api", false)))  {
+      newApiCommitter = true;
+      LOG.info("Using mapred newApiCommitter.");
+    }
+
+    committer = createOutputCommitter(conf);
+    boolean recoveryEnabled = conf.getBoolean(
+        MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    boolean recoverySupportedByCommitter = committer.isRecoverySupported();
+    if (recoveryEnabled && recoverySupportedByCommitter
+        && appAttemptID.getAttemptId() > 1) {
+      LOG.info("Recovery is enabled. "
+          + "Will try to recover from previous life on best effort basis.");
+      recoveryServ = new RecoveryService(appAttemptID, clock, 
+          committer);
       addIfService(recoveryServ);
       dispatcher = recoveryServ.getDispatcher();
       clock = recoveryServ.getClock();
-      completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
+      inRecovery = true;
     } else {
+      LOG.info("Not starting RecoveryService: recoveryEnabled: "
+          + recoveryEnabled + " recoverySupportedByCommitter: "
+          + recoverySupportedByCommitter + " ApplicationAttemptID: "
+          + appAttemptID.getAttemptId());
       dispatcher = new AsyncDispatcher();
       addIfService(dispatcher);
     }
@@ -223,15 +290,165 @@ public class MRAppMaster extends Composi
     super.init(conf);
   } // end of init()
 
+  private OutputCommitter createOutputCommitter(Configuration conf) {
+    OutputCommitter committer = null;
+
+    LOG.info("OutputCommitter set in config "
+        + conf.get("mapred.output.committer.class"));
+
+    if (newApiCommitter) {
+      org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils
+          .newTaskId(jobId, 0, TaskType.MAP);
+      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils
+          .newTaskAttemptId(taskID, 0);
+      TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
+          TypeConverter.fromYarn(attemptID));
+      OutputFormat outputFormat;
+      try {
+        outputFormat = ReflectionUtils.newInstance(taskContext
+            .getOutputFormatClass(), conf);
+        committer = outputFormat.getOutputCommitter(taskContext);
+      } catch (Exception e) {
+        throw new YarnException(e);
+      }
+    } else {
+      committer = ReflectionUtils.newInstance(conf.getClass(
+          "mapred.output.committer.class", FileOutputCommitter.class,
+          org.apache.hadoop.mapred.OutputCommitter.class), conf);
+    }
+    LOG.info("OutputCommitter is " + committer.getClass().getName());
+    return committer;
+  }
+
+  protected boolean keepJobFiles(JobConf conf) {
+    return (conf.getKeepTaskFilesPattern() != null || conf
+        .getKeepFailedTaskFiles());
+  }
+  
+  /**
+   * Create the default file System for this job.
+   * @param conf the conf object
+   * @return the default filesystem for this job
+   * @throws IOException
+   */
+  protected FileSystem getFileSystem(Configuration conf) throws IOException {
+    return FileSystem.get(conf);
+  }
+  
+  /**
+   * clean up staging directories for the job.
+   * @throws IOException
+   */
+  public void cleanupStagingDir() throws IOException {
+    /* make sure we clean the staging files */
+    String jobTempDir = null;
+    FileSystem fs = getFileSystem(getConfig());
+    try {
+      if (!keepJobFiles(new JobConf(getConfig()))) {
+        jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR);
+        if (jobTempDir == null) {
+          LOG.warn("Job Staging directory is null");
+          return;
+        }
+        Path jobTempDirPath = new Path(jobTempDir);
+        LOG.info("Deleting staging directory " + FileSystem.getDefaultUri(getConfig()) +
+            " " + jobTempDir);
+        fs.delete(jobTempDirPath, true);
+      }
+    } catch(IOException io) {
+      LOG.error("Failed to cleanup staging dir " + jobTempDir, io);
+    }
+  }
+  
+  /**
+   * Exit call. Just in a function call to enable testing.
+   */
+  protected void sysexit() {
+    System.exit(0);
+  }
+  
+  private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
+    @Override
+    public void handle(JobFinishEvent event) {
+      // job has finished
+      // this is the only job, so shut down the Appmaster
+      // note in a workflow scenario, this may lead to creation of a new
+      // job (FIXME?)
+
+      // TODO:currently just wait for some time so clients can know the
+      // final states. Will be removed once RM come on.
+      try {
+        Thread.sleep(5000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      try {
+        // Stop all services
+        // This will also send the final report to the ResourceManager
+        LOG.info("Calling stop for all the services");
+        stop();
+        
+        // Send job-end notification
+        try {
+          LOG.info("Job end notification started for jobID : "
+            + job.getReport().getJobId());
+          JobEndNotifier notifier = new JobEndNotifier();
+          notifier.setConf(getConfig());
+          notifier.notify(job.getReport());
+        } catch (InterruptedException ie) {
+          LOG.warn("Job end notification interrupted for jobID : "
+            + job.getReport().getJobId(), ie );
+        }
+      } catch (Throwable t) {
+        LOG.warn("Graceful stop failed ", t);
+      }
+      
+      // Cleanup staging directory
+      try {
+        cleanupStagingDir();
+      } catch(IOException io) {
+        LOG.warn("Failed to delete staging dir");
+      }
+      
+      //Bring the process down by force.
+      //Not needed after HADOOP-7140
+      LOG.info("Exiting MR AppMaster..GoodBye!");
+      sysexit();
+    }
+  }
+  
+  /**
+   * create an event handler that handles the job finish event.
+   * @return the job finish event handler.
+   */
+  protected EventHandler<JobFinishEvent> createJobFinishEventHandler() {
+    return new JobFinishEventHandler();
+  }
+
   /** Create and initialize (but don't start) a single job. */
   protected Job createJob(Configuration conf) {
 
-    // ////////// Obtain the tokens needed by the job. //////////
-    Credentials fsTokens = new Credentials();
-    UserGroupInformation currentUser = null;
+    // create single job
+    Job newJob = new JobImpl(jobId, appAttemptID, conf, dispatcher
+        .getEventHandler(), taskAttemptListener, jobTokenSecretManager,
+        fsTokens, clock, completedTasksFromPreviousRun, metrics, committer,
+        newApiCommitter, currentUser.getUserName(), appSubmitTime, amInfos);
+    ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
+
+    dispatcher.register(JobFinishEvent.Type.class,
+        createJobFinishEventHandler());     
+    return newJob;
+  } // end createJob()
+
+
+  /**
+   * Obtain the tokens needed by the job and put them in the UGI
+   * @param conf
+   */
+  protected void downloadTokensAndSetupUGI(Configuration conf) {
 
     try {
-      currentUser = UserGroupInformation.getCurrentUser();
+      this.currentUser = UserGroupInformation.getCurrentUser();
 
       if (UserGroupInformation.isSecurityEnabled()) {
         // Read the file-system tokens from the localized tokens-file.
@@ -246,56 +463,18 @@ public class MRAppMaster extends Composi
             + jobTokenFile);
 
         for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
-          LOG.info(" --- DEBUG: Token of kind " + tk.getKind()
-              + "in current ugi in the AppMaster for service "
-              + tk.getService());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Token of kind " + tk.getKind()
+                + "in current ugi in the AppMaster for service "
+                + tk.getService());
+          }
           currentUser.addToken(tk); // For use by AppMaster itself.
         }
       }
     } catch (IOException e) {
       throw new YarnException(e);
     }
-    // ////////// End of obtaining the tokens needed by the job. //////////
-
-    // create single job
-    Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(),
-        taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
-        completedTasksFromPreviousRun, metrics, currentUser.getUserName());
-    ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
-
-    dispatcher.register(JobFinishEvent.Type.class,
-        new EventHandler<JobFinishEvent>() {
-          @Override
-          public void handle(JobFinishEvent event) {
-            // job has finished
-            // this is the only job, so shut down the Appmaster
-            // note in a workflow scenario, this may lead to creation of a new
-            // job (FIXME?)
-
-            // TODO:currently just wait for some time so clients can know the
-            // final states. Will be removed once RM come on.
-            try {
-              Thread.sleep(5000);
-            } catch (InterruptedException e) {
-              e.printStackTrace();
-            }
-            LOG.info("Calling stop for all the services");
-            try {
-              stop();
-            } catch (Throwable t) {
-              LOG.warn("Graceful stop failed ", t);
-            }
-            //TODO: this is required because rpc server does not shut down
-            // in spite of calling server.stop().
-            //Bring the process down by force.
-            //Not needed after HADOOP-7140
-            LOG.info("Exiting MR AppMaster..GoodBye!");
-            System.exit(0);
-          }
-        });
-
-    return newJob;
-  } // end createJob()
+  }
 
   protected void addIfService(Object object) {
     if (object instanceof Service) {
@@ -373,6 +552,22 @@ public class MRAppMaster extends Composi
     return appAttemptID.getApplicationId();
   }
 
+  public ApplicationAttemptId getAttemptID() {
+    return appAttemptID;
+  }
+
+  public JobId getJobId() {
+    return jobId;
+  }
+
+  public OutputCommitter getCommitter() {
+    return committer;
+  }
+
+  public boolean isNewApiCommitter() {
+    return newApiCommitter;
+  }
+
   public int getStartCount() {
     return appAttemptID.getAttemptId();
   }
@@ -389,6 +584,10 @@ public class MRAppMaster extends Composi
     return completedTasksFromPreviousRun;
   }
 
+  public List<AMInfo> getAllAMInfos() {
+    return amInfos;
+  }
+  
   public ContainerAllocator getContainerAllocator() {
     return containerAllocator;
   }
@@ -522,6 +721,7 @@ public class MRAppMaster extends Composi
       return jobs;
     }
 
+    @SuppressWarnings("rawtypes")
     @Override
     public EventHandler getEventHandler() {
       return dispatcher.getEventHandler();
@@ -538,13 +738,39 @@ public class MRAppMaster extends Composi
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void start() {
 
-    ///////////////////// Create the job itself.
+    // Pull completedTasks etc from recovery
+    if (inRecovery) {
+      completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
+      amInfos = recoveryServ.getAMInfos();
+    }
+
+    // / Create the AMInfo for the current AppMaster
+    if (amInfos == null) {
+      amInfos = new LinkedList<AMInfo>();
+    }
+    AMInfo amInfo =
+        MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
+            nmPort, nmHttpPort);
+    amInfos.add(amInfo);
+
+    // /////////////////// Create the job itself.
     job = createJob(getConfig());
+
     // End of creating the job.
 
+    // Send out an MR AM inited event for this AM and all previous AMs.
+    for (AMInfo info : amInfos) {
+      dispatcher.getEventHandler().handle(
+          new JobHistoryEvent(job.getID(), new AMStartedEvent(info
+              .getAppAttemptId(), info.getStartTime(), info.getContainerId(),
+              info.getNodeManagerHost(), info.getNodeManagerPort(), info
+                  .getNodeManagerHttpPort())));
+    }
+
     // metrics system init is really init & start.
     // It's more test friendly to put it here.
     DefaultMetricsSystem.initialize("MRAppMaster");
@@ -590,6 +816,7 @@ public class MRAppMaster extends Composi
    * In a typical workflow, one presumably would want to uberize only a subset
    * of the jobs (the "small" ones), which is awkward with the current design.
    */
+  @SuppressWarnings("unchecked")
   protected void startJobs() {
     /** create a job-start event to get this ball rolling */
     JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
@@ -598,6 +825,7 @@ public class MRAppMaster extends Composi
   }
 
   private class JobEventDispatcher implements EventHandler<JobEvent> {
+    @SuppressWarnings("unchecked")
     @Override
     public void handle(JobEvent event) {
       ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);
@@ -605,6 +833,7 @@ public class MRAppMaster extends Composi
   }
 
   private class TaskEventDispatcher implements EventHandler<TaskEvent> {
+    @SuppressWarnings("unchecked")
     @Override
     public void handle(TaskEvent event) {
       Task task = context.getJob(event.getTaskID().getJobId()).getTask(
@@ -615,6 +844,7 @@ public class MRAppMaster extends Composi
 
   private class TaskAttemptEventDispatcher 
           implements EventHandler<TaskAttemptEvent> {
+    @SuppressWarnings("unchecked")
     @Override
     public void handle(TaskAttemptEvent event) {
       Job job = context.getJob(event.getTaskAttemptID().getTaskId().getJobId());
@@ -640,19 +870,44 @@ public class MRAppMaster extends Composi
     }
   }
 
+  private static void validateInputParam(String value, String param)
+      throws IOException {
+    if (value == null) {
+      String msg = param + " is null";
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+  }
+
   public static void main(String[] args) {
     try {
-      String applicationAttemptIdStr = System
-          .getenv(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV);
-      if (applicationAttemptIdStr == null) {
-        String msg = ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV
-            + " is null";
-        LOG.error(msg);
-        throw new IOException(msg);
-      }
-      ApplicationAttemptId applicationAttemptId = ConverterUtils
-          .toApplicationAttemptId(applicationAttemptIdStr);
-      MRAppMaster appMaster = new MRAppMaster(applicationAttemptId);
+      String containerIdStr =
+          System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
+      String nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV);
+      String nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV);
+      String nodeHttpPortString =
+          System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV);
+      String appSubmitTimeStr =
+          System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
+      
+      validateInputParam(containerIdStr,
+          ApplicationConstants.AM_CONTAINER_ID_ENV);
+      validateInputParam(nodeHostString, ApplicationConstants.NM_HOST_ENV);
+      validateInputParam(nodePortString, ApplicationConstants.NM_PORT_ENV);
+      validateInputParam(nodeHttpPortString,
+          ApplicationConstants.NM_HTTP_PORT_ENV);
+      validateInputParam(appSubmitTimeStr,
+          ApplicationConstants.APP_SUBMIT_TIME_ENV);
+
+      ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+      ApplicationAttemptId applicationAttemptId =
+          containerId.getApplicationAttemptId();
+      long appSubmitTime = Long.parseLong(appSubmitTimeStr);
+      
+      MRAppMaster appMaster =
+          new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
+              Integer.parseInt(nodePortString),
+              Integer.parseInt(nodeHttpPortString), appSubmitTime);
       Runtime.getRuntime().addShutdownHook(
           new CompositeServiceShutdownHook(appMaster));
       YarnConfiguration conf = new YarnConfiguration(new JobConf());

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRClientSecurityInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRClientSecurityInfo.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRClientSecurityInfo.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRClientSecurityInfo.java Wed Nov  2 05:34:31 2011
@@ -27,7 +27,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenInfo;
 import org.apache.hadoop.security.token.TokenSelector;
 import org.apache.hadoop.yarn.proto.MRClientProtocol;
-import org.apache.hadoop.yarn.security.ApplicationTokenSelector;
+import org.apache.hadoop.yarn.security.client.ClientTokenSelector;
 
 public class MRClientSecurityInfo extends SecurityInfo {
 
@@ -51,7 +51,7 @@ public class MRClientSecurityInfo extend
       @Override
       public Class<? extends TokenSelector<? extends TokenIdentifier>>
           value() {
-        return ApplicationTokenSelector.class;
+        return ClientTokenSelector.class;
       }
     };
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java Wed Nov  2 05:34:31 2011
@@ -24,12 +24,35 @@ import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 
+/**
+ * This class listens for changes to the state of a Task.
+ */
 public interface TaskAttemptListener {
 
   InetSocketAddress getAddress();
 
-  void register(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID);
+  /**
+   * register a JVM with the listener.  This should be called as soon as a 
+   * JVM ID is assigned to a task attempt, before it has been launched.
+   * @param jvmID The ID of the JVM .
+   */
+  void registerPendingTask(WrappedJvmID jvmID);
+  
+  /**
+   * Register the task and task attempt with the JVM.  This should be called
+   * when the JVM has been launched.
+   * @param attemptID the id of the attempt for this JVM.
+   * @param task the task itself for this JVM.
+   * @param jvmID the id of the JVM handling the task.
+   */
+  void registerLaunchedTask(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID);
 
+  /**
+   * Unregister the JVM and the attempt associated with it.  This should be 
+   * called when the attempt/JVM has finished executing and is being cleaned up.
+   * @param attemptID the ID of the attempt.
+   * @param jvmID the ID of the JVM for that attempt.
+   */
   void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID);
 
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java Wed Nov  2 05:34:31 2011
@@ -71,6 +71,7 @@ public class TaskHeartbeatHandler extend
   @Override
   public void start() {
     lostTaskCheckerThread = new Thread(new PingChecker());
+    lostTaskCheckerThread.setName("TaskHeartbeatHandler PingChecker");
     lostTaskCheckerThread.start();
     super.start();
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java Wed Nov  2 05:34:31 2011
@@ -18,20 +18,18 @@
 
 package org.apache.hadoop.mapreduce.v2.app.client;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
-import java.security.AccessControlException;
 import java.util.Arrays;
 import java.util.Collection;
 
-import org.apache.avro.ipc.Server;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
@@ -72,21 +70,20 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
 import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
-import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
 import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
+import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
@@ -131,8 +128,8 @@ public class MRClientService extends Abs
           System
               .getenv(ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME);
       byte[] bytes = Base64.decodeBase64(secretKeyStr);
-      ApplicationTokenIdentifier identifier =
-          new ApplicationTokenIdentifier(this.appContext.getApplicationID());
+      ClientTokenIdentifier identifier = new ClientTokenIdentifier(
+          this.appContext.getApplicationID());
       secretManager.setMasterKey(identifier, bytes);
     }
     server =
@@ -140,6 +137,14 @@ public class MRClientService extends Abs
             conf, secretManager,
             conf.getInt(MRJobConfig.MR_AM_JOB_CLIENT_THREAD_COUNT, 
                 MRJobConfig.DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT));
+    
+    // Enable service authorization?
+    if (conf.getBoolean(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
+        false)) {
+      refreshServiceAcls(conf, new MRAMPolicyProvider());
+    }
+
     server.start();
     this.bindAddress =
         NetUtils.createSocketAddr(hostNameResolved.getHostAddress()
@@ -154,8 +159,13 @@ public class MRClientService extends Abs
     super.start();
   }
 
+  void refreshServiceAcls(Configuration configuration, 
+      PolicyProvider policyProvider) {
+    this.server.refreshServiceAcl(configuration, policyProvider);
+  }
+
   public void stop() {
-    server.close();
+    server.stop();
     if (webApp != null) {
       webApp.stop();
     }
@@ -183,13 +193,6 @@ public class MRClientService extends Abs
       if (job == null) {
         throw RPCUtil.getRemoteException("Unknown job " + jobID);
       }
-      //TODO fix job acls.
-      //JobACL operation = JobACL.VIEW_JOB;
-      //if (modifyAccess) {
-      //  operation = JobACL.MODIFY_JOB;
-      //}
-      //TO disable check access ofr now.
-      //checkAccess(job, operation);
       return job;
     }
  
@@ -213,24 +216,6 @@ public class MRClientService extends Abs
       return attempt;
     }
 
-    private void checkAccess(Job job, JobACL jobOperation) 
-      throws YarnRemoteException {
-      if (!UserGroupInformation.isSecurityEnabled()) {
-        return;
-      }
-      UserGroupInformation callerUGI;
-      try {
-        callerUGI = UserGroupInformation.getCurrentUser();
-      } catch (IOException e) {
-        throw RPCUtil.getRemoteException(e);
-      }
-      if(!job.checkAccess(callerUGI, jobOperation)) {
-        throw RPCUtil.getRemoteException(new AccessControlException("User "
-            + callerUGI.getShortUserName() + " cannot perform operation "
-            + jobOperation.name() + " on " + job.getID()));
-      }
-    }
-
     @Override
     public GetCountersResponse getCounters(GetCountersRequest request) 
       throws YarnRemoteException {
@@ -291,6 +276,7 @@ public class MRClientService extends Abs
       return response;
     }
     
+    @SuppressWarnings("unchecked")
     @Override
     public KillJobResponse killJob(KillJobRequest request) 
       throws YarnRemoteException {
@@ -307,6 +293,7 @@ public class MRClientService extends Abs
       return response;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public KillTaskResponse killTask(KillTaskRequest request) 
       throws YarnRemoteException {
@@ -321,6 +308,7 @@ public class MRClientService extends Abs
       return response;
     }
     
+    @SuppressWarnings("unchecked")
     @Override
     public KillTaskAttemptResponse killTaskAttempt(
         KillTaskAttemptRequest request) throws YarnRemoteException {
@@ -350,6 +338,7 @@ public class MRClientService extends Abs
       return response;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public FailTaskAttemptResponse failTaskAttempt(
         FailTaskAttemptRequest request) throws YarnRemoteException {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java Wed Nov  2 05:34:31 2011
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@@ -68,5 +69,10 @@ public interface Job {
   TaskAttemptCompletionEvent[]
       getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
 
+  /**
+   * @return information for MR AppMasters (previously failed and current)
+   */
+  List<AMInfo> getAMInfos();
+  
   boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation);
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java Wed Nov  2 05:34:31 2011
@@ -58,6 +58,11 @@ public interface TaskAttempt {
    * @return node's http address if a container is assigned, otherwise null.
    */
   String getNodeHttpAddress();
+  
+  /**
+   * @return node's rack name if a container is assigned, otherwise null.
+   */
+  String getNodeRackName();
 
   /** 
    * @return time at which container is launched. If container is not launched

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java Wed Nov  2 05:34:31 2011
@@ -18,22 +18,29 @@
 
 package org.apache.hadoop.mapreduce.v2.app.job.event;
 
+import java.util.Map;
+
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.Container;
 
-
-
 public class TaskAttemptContainerAssignedEvent extends TaskAttemptEvent {
 
   private final Container container;
+  private final Map<ApplicationAccessType, String> applicationACLs;
 
   public TaskAttemptContainerAssignedEvent(TaskAttemptId id,
-      Container container) {
+      Container container, Map<ApplicationAccessType, String> applicationACLs) {
     super(id, TaskAttemptEventType.TA_ASSIGNED);
     this.container = container;
+    this.applicationACLs = applicationACLs;
   }
 
   public Container getContainer() {
     return this.container;
   }
+
+  public Map<ApplicationAccessType, String> getApplicationACLs() {
+    return this.applicationACLs;
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java Wed Nov  2 05:34:31 2011
@@ -48,7 +48,6 @@ public class TaskAttemptStatusUpdateEven
     public TaskAttemptId id;
     public float progress;
     public Counters counters;
-    public String diagnosticInfo;
     public String stateString;
     public Phase phase;
     public long outputSize;

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Wed Nov  2 05:34:31 2011
@@ -39,15 +39,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -63,7 +60,7 @@ import org.apache.hadoop.mapreduce.secur
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.Counter;
 import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
@@ -97,14 +94,11 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -125,21 +119,21 @@ public class JobImpl implements org.apac
 
   // Maximum no. of fetch-failure notifications after which map task is failed
   private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
-
-  private final RecordFactory recordFactory =
-      RecordFactoryProvider.getRecordFactory(null);
   
   //final fields
   private final ApplicationAttemptId applicationAttemptId;
   private final Clock clock;
   private final JobACLsManager aclsManager;
   private final String username;
+  private final OutputCommitter committer;
   private final Map<JobACL, AccessControlList> jobACLs;
   private final Set<TaskId> completedTasksFromPreviousRun;
+  private final List<AMInfo> amInfos;
   private final Lock readLock;
   private final Lock writeLock;
   private final JobId jobId;
   private final String jobName;
+  private final boolean newApiCommitter;
   private final org.apache.hadoop.mapreduce.JobID oldJobId;
   private final TaskAttemptListener taskAttemptListener;
   private final Object tasksSyncHandle = new Object();
@@ -148,6 +142,7 @@ public class JobImpl implements org.apac
   private final EventHandler eventHandler;
   private final MRAppMetrics metrics;
   private final String userName;
+  private final long appSubmitTime;
 
   private boolean lazyTasksCopyNeeded = false;
   private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
@@ -164,7 +159,6 @@ public class JobImpl implements org.apac
   private Path remoteJobSubmitDir;
   public Path remoteJobConfFile;
   private JobContext jobContext;
-  private OutputCommitter committer;
   private int allowedMapFailuresPercent = 0;
   private int allowedReduceFailuresPercent = 0;
   private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents;
@@ -339,7 +333,6 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_DIAGNOSTIC_UPDATE,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
                   JobEventType.INTERNAL_ERROR))
-
           // create the topology tables
           .installTopology();
  
@@ -355,7 +348,6 @@ public class JobImpl implements org.apac
   private int failedReduceTaskCount = 0;
   private int killedMapTaskCount = 0;
   private int killedReduceTaskCount = 0;
-  private long submitTime;
   private long startTime;
   private long finishTime;
   private float setupProgress;
@@ -366,29 +358,27 @@ public class JobImpl implements org.apac
   private Token<JobTokenIdentifier> jobToken;
   private JobTokenSecretManager jobTokenSecretManager;
 
-  public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf,
-      EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
+  public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
+      Configuration conf, EventHandler eventHandler,
+      TaskAttemptListener taskAttemptListener,
       JobTokenSecretManager jobTokenSecretManager,
-      Credentials fsTokenCredentials, Clock clock, 
+      Credentials fsTokenCredentials, Clock clock,
       Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics,
-      String userName) {
+      OutputCommitter committer, boolean newApiCommitter, String userName,
+      long appSubmitTime, List<AMInfo> amInfos) {
     this.applicationAttemptId = applicationAttemptId;
-    this.jobId = recordFactory.newRecordInstance(JobId.class);
+    this.jobId = jobId;
     this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
     this.conf = conf;
     this.metrics = metrics;
     this.clock = clock;
     this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
+    this.amInfos = amInfos;
     this.userName = userName;
-    ApplicationId applicationId = applicationAttemptId.getApplicationId();
-    jobId.setAppId(applicationId);
-    jobId.setId(applicationId.getId());
-    oldJobId = TypeConverter.fromYarn(jobId);
-    LOG.info("Job created" +
-    		" appId=" + applicationId + 
-    		" jobId=" + jobId + 
-    		" oldJobId=" + oldJobId);
-    
+    this.appSubmitTime = appSubmitTime;
+    this.oldJobId = TypeConverter.fromYarn(jobId);
+    this.newApiCommitter = newApiCommitter;
+
     this.taskAttemptListener = taskAttemptListener;
     this.eventHandler = eventHandler;
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -397,6 +387,7 @@ public class JobImpl implements org.apac
 
     this.fsTokens = fsTokenCredentials;
     this.jobTokenSecretManager = jobTokenSecretManager;
+    this.committer = committer;
 
     this.aclsManager = new JobACLsManager(conf);
     this.username = System.getProperty("user.name");
@@ -589,13 +580,14 @@ public class JobImpl implements org.apac
 
       if (getState() == JobState.NEW) {
         return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
-            startTime, finishTime, setupProgress, 0.0f,
-            0.0f, cleanupProgress, remoteJobConfFile.toString());
+            appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
+            cleanupProgress, remoteJobConfFile.toString(), amInfos);
       }
 
       return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
-          startTime, finishTime, setupProgress, computeProgress(mapTasks),
-          computeProgress(reduceTasks), cleanupProgress, remoteJobConfFile.toString());
+          appSubmitTime, startTime, finishTime, setupProgress,
+          computeProgress(mapTasks), computeProgress(reduceTasks),
+          cleanupProgress, remoteJobConfFile.toString(), amInfos);
     } finally {
       readLock.unlock();
     }
@@ -724,6 +716,16 @@ public class JobImpl implements org.apac
     this.getEventHandler().handle(new JobHistoryEvent(this.jobId, jfe));    
   }
   
+  /**
+   * Create the default file System for this job.
+   * @param conf the conf object
+   * @return the default filesystem for this job
+   * @throws IOException
+   */
+  protected FileSystem getFileSystem(Configuration conf) throws IOException {
+    return FileSystem.get(conf);
+  }
+  
   static JobState checkJobCompleteSuccess(JobImpl job) {
     // check for Job success
     if (job.completedTaskCount == job.getTasks().size()) {
@@ -733,7 +735,6 @@ public class JobImpl implements org.apac
       } catch (IOException e) {
         LOG.warn("Could not do commit for Job", e);
       }
-      
       job.logJobHistoryFinishedEvent();
       return job.finished(JobState.SUCCEEDED);
     }
@@ -798,6 +799,11 @@ public class JobImpl implements org.apac
   public Map<JobACL, AccessControlList> getJobACLs() {
     return Collections.unmodifiableMap(jobACLs);
   }
+  
+  @Override
+  public List<AMInfo> getAMInfos() {
+    return amInfos;
+  }
 
   public static class InitTransition 
       implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
@@ -811,18 +817,17 @@ public class JobImpl implements org.apac
      */
     @Override
     public JobState transition(JobImpl job, JobEvent event) {
-      job.submitTime = job.clock.getTime();
       job.metrics.submittedJob(job);
       job.metrics.preparingJob(job);
       try {
         setup(job);
-        job.fs = FileSystem.get(job.conf);
+        job.fs = job.getFileSystem(job.conf);
 
         //log to job history
         JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
               job.conf.get(MRJobConfig.JOB_NAME, "test"), 
             job.conf.get(MRJobConfig.USER_NAME, "mapred"),
-            job.submitTime,
+            job.appSubmitTime,
             job.remoteJobConfFile.toString(),
             job.jobACLs, job.conf.get(MRJobConfig.QUEUE_NAME, "default"));
         job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
@@ -838,60 +843,30 @@ public class JobImpl implements org.apac
 
         checkTaskLimits();
 
-        
-        boolean newApiCommitter = false;
-        if ((job.numReduceTasks > 0 && 
-            job.conf.getBoolean("mapred.reducer.new-api", false)) ||
-              (job.numReduceTasks == 0 && 
-               job.conf.getBoolean("mapred.mapper.new-api", false)))  {
-          newApiCommitter = true;
-          LOG.info("Using mapred newApiCommitter.");
-        }
-        
-        LOG.info("OutputCommitter set in config " + job.conf.get("mapred.output.committer.class"));
-        
-        if (newApiCommitter) {
+        if (job.newApiCommitter) {
           job.jobContext = new JobContextImpl(job.conf,
               job.oldJobId);
-          org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = RecordFactoryProvider
-              .getRecordFactory(null)
-              .newRecordInstance(
-                  org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
-          attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null)
-              .newRecordInstance(TaskId.class));
-          attemptID.getTaskId().setJobId(job.jobId);
-          attemptID.getTaskId().setTaskType(TaskType.MAP);
-          TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.conf,
-              TypeConverter.fromYarn(attemptID));
-          try {
-            OutputFormat outputFormat = ReflectionUtils.newInstance(
-                taskContext.getOutputFormatClass(), job.conf);
-            job.committer = outputFormat.getOutputCommitter(taskContext);
-          } catch(Exception e) {
-            throw new IOException("Failed to assign outputcommitter", e);
-          }
         } else {
           job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
               new JobConf(job.conf), job.oldJobId);
-          job.committer = ReflectionUtils.newInstance(
-              job.conf.getClass("mapred.output.committer.class", FileOutputCommitter.class,
-              org.apache.hadoop.mapred.OutputCommitter.class), job.conf);
         }
-        LOG.info("OutputCommitter is " + job.committer.getClass().getName());
         
         long inputLength = 0;
         for (int i = 0; i < job.numMapTasks; ++i) {
           inputLength += taskSplitMetaInfo[i].getInputDataLength();
         }
 
-//FIXME:  need new memory criterion for uber-decision (oops, too late here; until AM-resizing supported, must depend on job client to pass fat-slot needs)
+        //FIXME:  need new memory criterion for uber-decision (oops, too late here; 
+        // until AM-resizing supported, must depend on job client to pass fat-slot needs)
         // these are no longer "system" settings, necessarily; user may override
         int sysMaxMaps = job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
         int sysMaxReduces =
             job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
         long sysMaxBytes = job.conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
-            job.conf.getLong("dfs.block.size", 64*1024*1024));  //FIXME: this is wrong; get FS from [File?]InputFormat and default block size from that
-        //long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot(); // FIXME [could use default AM-container memory size...]
+            job.conf.getLong("dfs.block.size", 64*1024*1024));  //FIXME: this is 
+        // wrong; get FS from [File?]InputFormat and default block size from that
+        //long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot(); 
+        // FIXME [could use default AM-container memory size...]
 
         boolean uberEnabled =
             job.conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
@@ -900,8 +875,8 @@ public class JobImpl implements org.apac
         boolean smallInput = (inputLength <= sysMaxBytes);
         boolean smallMemory = true;  //FIXME (see above)
             // ignoring overhead due to UberTask and statics as negligible here:
-//  FIXME   && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
-//              || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)
+        //  FIXME   && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
+        //              || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)
         boolean notChainJob = !isChainJob(job.conf);
 
         // User has overall veto power over uberization, or user can modify
@@ -935,7 +910,9 @@ public class JobImpl implements org.apac
           job.conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
 
           // disable speculation:  makes no sense to speculate an entire job
-//        canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old version, ultimately was from conf.getMapSpeculativeExecution(), conf.getReduceSpeculativeExecution()]
+          //canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old 
+          //version, ultimately was from conf.getMapSpeculativeExecution(), 
+          //conf.getReduceSpeculativeExecution()]
         } else {
           StringBuilder msg = new StringBuilder();
           msg.append("Not uberizing ").append(job.jobId).append(" because:");
@@ -1022,13 +999,6 @@ public class JobImpl implements org.apac
       if (UserGroupInformation.isSecurityEnabled()) {
         tokenStorage.addAll(job.fsTokens);
       }
-
-      Path remoteJobTokenFile =
-          new Path(job.remoteJobSubmitDir,
-              MRJobConfig.APPLICATION_TOKENS_FILE);
-      tokenStorage.writeTokenStorageFile(remoteJobTokenFile, job.conf);
-      LOG.info("Writing back the job-token file on the remote file system:"
-          + remoteJobTokenFile.toString());
     }
 
     /**
@@ -1138,7 +1108,7 @@ public class JobImpl implements org.apac
              job.isUber()); //Will transition to state running. Currently in INITED
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
       JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
-          job.submitTime, job.startTime);
+          job.appSubmitTime, job.startTime);
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
       job.metrics.runningJob(job);
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Wed Nov  2 05:34:31 2011
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -53,7 +52,6 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
@@ -98,16 +96,18 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -116,9 +116,12 @@ import org.apache.hadoop.yarn.state.Inva
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.RackResolver;
 
+
 /**
  * Implementation of TaskAttempt interface.
  */
@@ -156,6 +159,8 @@ public abstract class TaskAttemptImpl im
   private long finishTime;
   private WrappedProgressSplitsBlock progressSplitBlock;
   private int shufflePort = -1;
+  private String trackerName;
+  private int httpPort;
 
   private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
     new CleanupContainerTransition();
@@ -421,9 +426,10 @@ public abstract class TaskAttemptImpl im
     stateMachine;
 
   private ContainerId containerID;
-  private String nodeHostName;
+  private NodeId containerNodeId;
   private String containerMgrAddress;
   private String nodeHttpAddress;
+  private String nodeRackName;
   private WrappedJvmID jvmID;
   private ContainerToken containerToken;
   private Resource assignedCapability;
@@ -434,6 +440,9 @@ public abstract class TaskAttemptImpl im
   
   //this is the last status reported by the REMOTE running attempt
   private TaskAttemptStatus reportedStatus;
+  
+  private static final String LINE_SEPARATOR = System
+      .getProperty("line.separator");
 
   public TaskAttemptImpl(TaskId taskId, int i, 
       @SuppressWarnings("rawtypes") EventHandler eventHandler,
@@ -526,8 +535,10 @@ public abstract class TaskAttemptImpl im
 
   /**
    * Create the {@link ContainerLaunchContext} for this attempt.
+   * @param applicationACLs 
    */
-  private ContainerLaunchContext createContainerLaunchContext() {
+  private ContainerLaunchContext createContainerLaunchContext(
+      Map<ApplicationAccessType, String> applicationACLs) {
 
     // Application resources
     Map<String, LocalResource> localResources = 
@@ -611,7 +622,7 @@ public abstract class TaskAttemptImpl im
       serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
           ShuffleHandler.serializeServiceData(jobToken));
 
-      MRApps.addToEnvironment(
+      Apps.addToEnvironment(
           environment,  
           Environment.CLASSPATH.name(), 
           getInitialClasspath());
@@ -628,17 +639,11 @@ public abstract class TaskAttemptImpl im
         jvmID);
     
     // Construct the actual Container
-    ContainerLaunchContext container =
-        recordFactory.newRecordInstance(ContainerLaunchContext.class);
-    container.setContainerId(containerID);
-    container.setUser(conf.get(MRJobConfig.USER_NAME));
-    container.setResource(assignedCapability);
-    container.setLocalResources(localResources);
-    container.setEnvironment(environment);
-    container.setCommands(commands);
-    container.setServiceData(serviceData);
-    container.setContainerTokens(tokens);
-    
+    ContainerLaunchContext container = BuilderUtils
+        .newContainerLaunchContext(containerID, conf
+            .get(MRJobConfig.USER_NAME), assignedCapability, localResources,
+            environment, commands, serviceData, tokens, applicationACLs);
+
     return container;
   }
 
@@ -723,6 +728,19 @@ public abstract class TaskAttemptImpl im
       readLock.unlock();
     }
   }
+  
+  /**
+   * If container Assigned then return the node's rackname, otherwise null.
+   */
+  @Override
+  public String getNodeRackName() {
+    this.readLock.lock();
+    try {
+      return this.nodeRackName;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
 
   protected abstract org.apache.hadoop.mapred.Task createRemoteTask();
 
@@ -758,10 +776,16 @@ public abstract class TaskAttemptImpl im
       result.setStartTime(launchTime);
       result.setFinishTime(finishTime);
       result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
-      result.setDiagnosticInfo(reportedStatus.diagnosticInfo);
+      result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
       result.setPhase(reportedStatus.phase);
       result.setStateString(reportedStatus.stateString);
       result.setCounters(getCounters());
+      result.setContainerId(this.getAssignedContainerID());
+      result.setNodeManagerHost(trackerName);
+      result.setNodeManagerHttpPort(httpPort);
+      if (this.containerNodeId != null) {
+        result.setNodeManagerPort(this.containerNodeId.getPort());
+      }
       return result;
     } finally {
       readLock.unlock();
@@ -855,7 +879,7 @@ public abstract class TaskAttemptImpl im
   private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
     TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
     int slotMemoryReq =
-        taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
+       taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
     int simSlotsRequired =
         slotMemoryReq
             / (taskType == TaskType.MAP ? MAP_MEMORY_MB_DEFAULT
@@ -888,15 +912,20 @@ public abstract class TaskAttemptImpl im
     return jce;
   }
 
-  private static TaskAttemptUnsuccessfulCompletionEvent createTaskAttemptUnsuccessfulCompletionEvent(
-      TaskAttemptImpl taskAttempt, TaskAttemptState attemptState) {
-    TaskAttemptUnsuccessfulCompletionEvent tauce = new TaskAttemptUnsuccessfulCompletionEvent(
-        TypeConverter.fromYarn(taskAttempt.attemptId),
-        TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
-        attemptState.toString(), taskAttempt.finishTime,
-        taskAttempt.nodeHostName == null ? "UNKNOWN" : taskAttempt.nodeHostName,
-        taskAttempt.reportedStatus.diagnosticInfo.toString(),
-        taskAttempt.getProgressSplitBlock().burst());
+  private static
+      TaskAttemptUnsuccessfulCompletionEvent
+      createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
+          TaskAttemptState attemptState) {
+    TaskAttemptUnsuccessfulCompletionEvent tauce =
+        new TaskAttemptUnsuccessfulCompletionEvent(
+            TypeConverter.fromYarn(taskAttempt.attemptId),
+            TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId()
+                .getTaskType()), attemptState.toString(),
+            taskAttempt.finishTime,
+            taskAttempt.containerMgrAddress == null ? "UNKNOWN"
+                : taskAttempt.containerMgrAddress, StringUtils.join(
+                LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt
+                .getProgressSplitBlock().burst());
     return tauce;
   }
 
@@ -988,17 +1017,19 @@ public abstract class TaskAttemptImpl im
 
   private static class ContainerAssignedTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({ "unchecked", "deprecation" })
     @Override
     public void transition(final TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent event) {
-      TaskAttemptContainerAssignedEvent cEvent = 
+      final TaskAttemptContainerAssignedEvent cEvent = 
         (TaskAttemptContainerAssignedEvent) event;
       taskAttempt.containerID = cEvent.getContainer().getId();
-      taskAttempt.nodeHostName = cEvent.getContainer().getNodeId().getHost();
-      taskAttempt.containerMgrAddress = cEvent.getContainer().getNodeId()
+      taskAttempt.containerNodeId = cEvent.getContainer().getNodeId();
+      taskAttempt.containerMgrAddress = taskAttempt.containerNodeId
           .toString();
       taskAttempt.nodeHttpAddress = cEvent.getContainer().getNodeHttpAddress();
+      taskAttempt.nodeRackName = RackResolver.resolve(
+          taskAttempt.containerNodeId.getHost()).getNetworkLocation();
       taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
       taskAttempt.assignedCapability = cEvent.getContainer().getResource();
       // this is a _real_ Task (classic Hadoop mapred flavor):
@@ -1006,6 +1037,7 @@ public abstract class TaskAttemptImpl im
       taskAttempt.jvmID = new WrappedJvmID(
           taskAttempt.remoteTask.getTaskID().getJobID(), 
           taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
+      taskAttempt.taskAttemptListener.registerPendingTask(taskAttempt.jvmID);
       
       //launch the container
       //create the container object to be launched for a given Task attempt
@@ -1015,7 +1047,8 @@ public abstract class TaskAttemptImpl im
               taskAttempt.containerMgrAddress, taskAttempt.containerToken) {
         @Override
         public ContainerLaunchContext getContainer() {
-          return taskAttempt.createContainerLaunchContext();
+          return taskAttempt.createContainerLaunchContext(cEvent
+              .getApplicationACLs());
         }
         @Override
         public Task getRemoteTask() {  // classic mapred Task, not YARN version
@@ -1095,14 +1128,18 @@ public abstract class TaskAttemptImpl im
 
       //set the launch time
       taskAttempt.launchTime = taskAttempt.clock.getTime();
+      taskAttempt.shufflePort = event.getShufflePort();
+
       // register it to TaskAttemptListener so that it start listening
       // for it
-      taskAttempt.taskAttemptListener.register(
+      taskAttempt.taskAttemptListener.registerLaunchedTask(
           taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID);
       //TODO Resolve to host / IP in case of a local address.
       InetSocketAddress nodeHttpInetAddr =
           NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
                                                                   // Costly?
+      taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
+      taskAttempt.httpPort = nodeHttpInetAddr.getPort();
       JobCounterUpdateEvent jce =
           new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
               .getJobId());
@@ -1112,11 +1149,15 @@ public abstract class TaskAttemptImpl im
               , 1);
       taskAttempt.eventHandler.handle(jce);
       
+      LOG.info("TaskAttempt: [" + taskAttempt.attemptId
+          + "] using containerId: [" + taskAttempt.containerID + " on NM: ["
+          + taskAttempt.containerMgrAddress + "]");
       TaskAttemptStartedEvent tase =
         new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
             TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
             taskAttempt.launchTime,
-            nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort());
+            nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
+            taskAttempt.shufflePort, taskAttempt.containerID);
       taskAttempt.eventHandler.handle
           (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
       taskAttempt.eventHandler.handle
@@ -1125,7 +1166,6 @@ public abstract class TaskAttemptImpl im
       //make remoteTask reference as null as it is no more needed
       //and free up the memory
       taskAttempt.remoteTask = null;
-      taskAttempt.shufflePort = event.getShufflePort();
       
       //tell the Task that attempt has started
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
@@ -1152,6 +1192,7 @@ public abstract class TaskAttemptImpl im
     @Override
     public void transition(TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent event) {
+      @SuppressWarnings("deprecation")
       TaskAttemptContext taskContext =
         new TaskAttemptContextImpl(new JobConf(taskAttempt.conf),
             TypeConverter.fromYarn(taskAttempt.attemptId));
@@ -1229,7 +1270,10 @@ public abstract class TaskAttemptImpl im
          TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
          state.toString(),
          this.reportedStatus.mapFinishTime,
-         finishTime, this.nodeHostName == null ? "UNKNOWN" : this.nodeHostName,
+         finishTime,
+         this.containerNodeId == null ? "UNKNOWN"
+             : this.containerNodeId.getHost(),
+         this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
          this.reportedStatus.stateString,
          TypeConverter.fromYarn(getCounters()),
          getProgressSplitBlock().burst());
@@ -1242,7 +1286,10 @@ public abstract class TaskAttemptImpl im
          state.toString(),
          this.reportedStatus.shuffleFinishTime,
          this.reportedStatus.sortFinishTime,
-         finishTime, this.containerMgrAddress == null ? "UNKNOWN" : this.containerMgrAddress,
+         finishTime,
+         this.containerNodeId == null ? "UNKNOWN"
+                                         : this.containerNodeId.getHost(),
+         this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
          this.reportedStatus.stateString,
          TypeConverter.fromYarn(getCounters()),
          getProgressSplitBlock().burst());
@@ -1352,8 +1399,6 @@ public abstract class TaskAttemptImpl im
           (new SpeculatorEvent
               (taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
       
-      //add to diagnostic
-      taskAttempt.addDiagnosticInfo(newReportedStatus.diagnosticInfo);
       taskAttempt.updateProgressSplits();
       
       //if fetch failures are present, send the fetch failure event to job
@@ -1381,7 +1426,6 @@ public abstract class TaskAttemptImpl im
 
   private void initTaskAttemptStatus(TaskAttemptStatus result) {
     result.progress = 0.0f;
-    result.diagnosticInfo = "";
     result.phase = Phase.STARTING;
     result.stateString = "NEW";
     result.taskState = TaskAttemptState.NEW;

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Wed Nov  2 05:34:31 2011
@@ -441,10 +441,20 @@ public abstract class TaskImpl implement
     float progress = 0f;
     TaskAttempt result = null;
     for (TaskAttempt at : attempts.values()) {
+      switch (at.getState()) {
+      
+      // ignore all failed task attempts
+      case FAIL_CONTAINER_CLEANUP: 
+      case FAIL_TASK_CLEANUP: 
+      case FAILED: 
+      case KILL_CONTAINER_CLEANUP: 
+      case KILL_TASK_CLEANUP: 
+      case KILLED:
+        continue;      
+      }      
       if (result == null) {
         result = at; //The first time around
       }
-      //TODO: consider the nextAttemptNumber only if it is not failed/killed ?
       // calculate the best progress
       if (at.getProgress() > progress) {
         result = at;
@@ -496,7 +506,7 @@ public abstract class TaskImpl implement
         break;
         
       case 1:
-        Map newAttempts
+        Map<TaskAttemptId, TaskAttempt> newAttempts
             = new LinkedHashMap<TaskAttemptId, TaskAttempt>(maxAttempts);
         newAttempts.putAll(attempts);
         attempts = newAttempts;
@@ -558,7 +568,8 @@ public abstract class TaskImpl implement
     //raise the completion event only if the container is assigned
     // to nextAttemptNumber
     if (attempt.getNodeHttpAddress() != null) {
-      TaskAttemptCompletionEvent tce = recordFactory.newRecordInstance(TaskAttemptCompletionEvent.class);
+      TaskAttemptCompletionEvent tce = recordFactory
+          .newRecordInstance(TaskAttemptCompletionEvent.class);
       tce.setEventId(-1);
       tce.setMapOutputServerAddress("http://"
           + attempt.getNodeHttpAddress().split(":")[0] + ":"

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java Wed Nov  2 05:34:31 2011
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app.launcher;
 
 
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.yarn.event.EventHandler;
 
 public interface ContainerLauncher 
@@ -28,4 +29,12 @@ public interface ContainerLauncher 
     CONTAINER_REMOTE_LAUNCH,
     CONTAINER_REMOTE_CLEANUP
   }
+
+  // Not a documented config. Only used for tests
+  static final String MR_AM_NM_COMMAND_TIMEOUT = MRJobConfig.MR_AM_PREFIX
+      + "nm-command-timeout";
+  /**
+   *  Maximum of 1 minute timeout for a Node to react to the command
+   */
+  static final int DEFAULT_NM__COMMAND_TIMEOUT = 60000;
 }