You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2011/09/29 02:42:55 UTC

svn commit: r1177130 [2/7] - in /hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apa...

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu Sep 29 00:42:47 2011
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -62,7 +61,6 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.v2.MRConstants;
 import org.apache.hadoop.mapreduce.v2.api.records.Counter;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
@@ -103,6 +101,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
@@ -117,7 +116,6 @@ import org.apache.hadoop.yarn.state.Inva
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.RackResolver;
 
@@ -153,7 +151,7 @@ public abstract class TaskAttemptImpl im
   private Token<JobTokenIdentifier> jobToken;
   private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
   private static String initialClasspath = null;
-  private final Object classpathLock = new Object();
+  private static final Object classpathLock = new Object();
   private long launchTime;
   private long finishTime;
   private WrappedProgressSplitsBlock progressSplitBlock;
@@ -518,8 +516,8 @@ public abstract class TaskAttemptImpl im
         return initialClasspath;
       }
       Map<String, String> env = new HashMap<String, String>();
-      MRApps.setInitialClasspath(env);
-      initialClasspath = env.get(MRApps.CLASSPATH);
+      MRApps.setClasspath(env);
+      initialClasspath = env.get(Environment.CLASSPATH.name());
       initialClasspathFlag.set(true);
       return initialClasspath;
     }
@@ -531,16 +529,18 @@ public abstract class TaskAttemptImpl im
    */
   private ContainerLaunchContext createContainerLaunchContext() {
 
-    ContainerLaunchContext container =
-        recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
     // Application resources
     Map<String, LocalResource> localResources = 
         new HashMap<String, LocalResource>();
     
     // Application environment
     Map<String, String> environment = new HashMap<String, String>();
-    
+
+    // Service data
+    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+
+    // Tokens
+    ByteBuffer tokens = ByteBuffer.wrap(new byte[]{});
     try {
       FileSystem remoteFS = FileSystem.get(conf);
 
@@ -550,7 +550,7 @@ public abstract class TaskAttemptImpl im
               MRJobConfig.JAR))).makeQualified(remoteFS.getUri(), 
                                                remoteFS.getWorkingDirectory());
         localResources.put(
-            MRConstants.JOB_JAR,
+            MRJobConfig.JOB_JAR,
             createLocalResource(remoteFS, recordFactory, remoteJobJar,
                 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
         LOG.info("The job-jar file on the remote FS is "
@@ -570,9 +570,9 @@ public abstract class TaskAttemptImpl im
       Path remoteJobSubmitDir =
           new Path(path, oldJobId.toString());
       Path remoteJobConfPath = 
-          new Path(remoteJobSubmitDir, MRConstants.JOB_CONF_FILE);
+          new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
       localResources.put(
-          MRConstants.JOB_CONF_FILE,
+          MRJobConfig.JOB_CONF_FILE,
           createLocalResource(remoteFS, recordFactory, remoteJobConfPath,
               LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
       LOG.info("The job-conf file on the remote FS is "
@@ -580,12 +580,8 @@ public abstract class TaskAttemptImpl im
       // //////////// End of JobConf setup
 
       // Setup DistributedCache
-      MRApps.setupDistributedCache(conf, localResources, environment);
+      MRApps.setupDistributedCache(conf, localResources);
 
-      // Set local-resources and environment
-      container.setLocalResources(localResources);
-      container.setEnvironment(environment);
-      
       // Setup up tokens
       Credentials taskCredentials = new Credentials();
 
@@ -606,52 +602,43 @@ public abstract class TaskAttemptImpl im
       LOG.info("Size of containertokens_dob is "
           + taskCredentials.numberOfTokens());
       taskCredentials.writeTokenStorageToStream(containerTokens_dob);
-      container.setContainerTokens(
+      tokens = 
           ByteBuffer.wrap(containerTokens_dob.getData(), 0,
-              containerTokens_dob.getLength()));
+              containerTokens_dob.getLength());
 
       // Add shuffle token
       LOG.info("Putting shuffle token in serviceData");
-      Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
       serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
           ShuffleHandler.serializeServiceData(jobToken));
-      container.setServiceData(serviceData);
 
-      MRApps.addToClassPath(container.getEnvironment(), getInitialClasspath());
+      MRApps.addToEnvironment(
+          environment,  
+          Environment.CLASSPATH.name(), 
+          getInitialClasspath());
     } catch (IOException e) {
       throw new YarnException(e);
     }
-    
-    container.setContainerId(containerID);
-    container.setUser(conf.get(MRJobConfig.USER_NAME)); // TODO: Fix
-
-    File workDir = new File("$PWD"); // Will be expanded by the shell.
-    String containerLogDir =
-        new File(ApplicationConstants.LOG_DIR_EXPANSION_VAR).toString();
-    String childTmpDir = new File(workDir, "tmp").toString();
-    String javaHome = "${JAVA_HOME}"; // Will be expanded by the shell.
-    String nmLdLibraryPath = "{LD_LIBRARY_PATH}"; // Expanded by the shell?
-    List<String> classPaths = new ArrayList<String>();
-
-    String localizedApplicationTokensFile =
-        new File(workDir, MRConstants.APPLICATION_TOKENS_FILE).toString();
-    classPaths.add(MRConstants.JOB_JAR);
-    classPaths.add(MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
-    classPaths.add(workDir.toString()); // TODO
 
-    // Construct the actual Container
-    container.setCommands(MapReduceChildJVM.getVMCommand(
-        taskAttemptListener.getAddress(), remoteTask, javaHome,
-        workDir.toString(), containerLogDir, childTmpDir, jvmID));
-
-    MapReduceChildJVM.setVMEnv(container.getEnvironment(), classPaths,
-        workDir.toString(), containerLogDir, nmLdLibraryPath, remoteTask,
-        localizedApplicationTokensFile);
+    // Setup environment
+    MapReduceChildJVM.setVMEnv(environment, remoteTask);
 
+    // Set up the launch command
+    List<String> commands = MapReduceChildJVM.getVMCommand(
+        taskAttemptListener.getAddress(), remoteTask,
+        jvmID);
+    
     // Construct the actual Container
+    ContainerLaunchContext container =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
     container.setContainerId(containerID);
     container.setUser(conf.get(MRJobConfig.USER_NAME));
     container.setResource(assignedCapability);
+    container.setLocalResources(localResources);
+    container.setEnvironment(environment);
+    container.setCommands(commands);
+    container.setServiceData(serviceData);
+    container.setContainerTokens(tokens);
+    
     return container;
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Thu Sep 29 00:42:47 2011
@@ -73,6 +73,8 @@ public class ContainerLauncherImpl exten
 
   private AppContext context;
   private ThreadPoolExecutor launcherPool;
+  private static final int INITIAL_POOL_SIZE = 10;
+  private int limitOnPoolSize;
   private Thread eventHandlingThread;
   private BlockingQueue<ContainerLauncherEvent> eventQueue =
       new LinkedBlockingQueue<ContainerLauncherEvent>();
@@ -96,16 +98,17 @@ public class ContainerLauncherImpl exten
         YarnConfiguration.YARN_SECURITY_INFO,
         ContainerManagerSecurityInfo.class, SecurityInfo.class);
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
+    this.limitOnPoolSize = conf.getInt(
+        MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
+        MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
     super.init(myLocalConfig);
   }
 
   public void start() {
-    launcherPool =
-        new ThreadPoolExecutor(getConfig().getInt(
-            MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT, 10),
-            Integer.MAX_VALUE, 1, TimeUnit.HOURS,
-            new LinkedBlockingQueue<Runnable>());
-    launcherPool.prestartAllCoreThreads(); // Wait for work.
+    // Start with a default core-pool size of 10 and change it dynamically.
+    launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE,
+        Integer.MAX_VALUE, 1, TimeUnit.HOURS,
+        new LinkedBlockingQueue<Runnable>());
     eventHandlingThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -117,6 +120,26 @@ public class ContainerLauncherImpl exten
             LOG.error("Returning, interrupted : " + e);
             return;
           }
+
+          int poolSize = launcherPool.getCorePoolSize();
+
+          // See if we need up the pool size only if haven't reached the
+          // maximum limit yet.
+          if (poolSize != limitOnPoolSize) {
+
+            // nodes where containers will run at *this* point of time. This is
+            // *not* the cluster size and doesn't need to be.
+            int numNodes = ugiMap.size();
+            int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
+
+            if (poolSize <= idealPoolSize) {
+              // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
+              // later is just a buffer so we are not always increasing the
+              // pool-size
+              launcherPool.setCorePoolSize(idealPoolSize + INITIAL_POOL_SIZE);
+            }
+          }
+
           // the events from the queue are handled in parallel
           // using a thread pool
           launcherPool.execute(new EventProcessor(event));

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Thu Sep 29 00:42:47 2011
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app.local;
 
+import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -30,15 +31,19 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
-import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 
 /**
@@ -66,6 +71,20 @@ public class LocalContainerAllocator ext
   }
 
   @Override
+  protected synchronized void heartbeat() throws Exception {
+    AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
+        this.applicationAttemptId, this.lastResponseID, super
+            .getApplicationProgress(), new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>());
+    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
+    AMResponse response = allocateResponse.getAMResponse();
+    if (response.getReboot()) {
+      // TODO
+      LOG.info("Event from RM: shutting down Application Master");
+    }
+  }
+
+  @Override
   public void handle(ContainerAllocatorEvent event) {
     if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
       LOG.info("Processing the event " + event.toString());

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Thu Sep 29 00:42:47 2011
@@ -58,7 +58,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -92,10 +92,9 @@ public class RecoveryService extends Com
 
   private static final Log LOG = LogFactory.getLog(RecoveryService.class);
 
-  private final ApplicationId appID;
+  private final ApplicationAttemptId applicationAttemptId;
   private final Dispatcher dispatcher;
   private final ControlledClock clock;
-  private final int startCount;
 
   private JobInfo jobInfo = null;
   private final Map<TaskId, TaskInfo> completedTasks =
@@ -106,10 +105,10 @@ public class RecoveryService extends Com
 
   private volatile boolean recoveryMode = false;
 
-  public RecoveryService(ApplicationId appID, Clock clock, int startCount) {
+  public RecoveryService(ApplicationAttemptId applicationAttemptId, 
+      Clock clock) {
     super("RecoveringDispatcher");
-    this.appID = appID;
-    this.startCount = startCount;
+    this.applicationAttemptId = applicationAttemptId;
     this.dispatcher = new RecoveryDispatcher();
     this.clock = new ControlledClock(clock);
       addService((Service) dispatcher);
@@ -152,7 +151,8 @@ public class RecoveryService extends Com
 
   private void parse() throws IOException {
     // TODO: parse history file based on startCount
-    String jobName = TypeConverter.fromYarn(appID).toString();
+    String jobName = 
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
     String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
     FSDataInputStream in = null;
     Path historyFile = null;
@@ -160,8 +160,9 @@ public class RecoveryService extends Com
         new Path(jobhistoryDir));
     FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
         getConfig());
+    //read the previous history file
     historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
-        histDirPath, jobName, startCount - 1));          //read the previous history file
+        histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));          
     in = fc.open(historyFile);
     JobHistoryParser parser = new JobHistoryParser(in);
     jobInfo = parser.parse();

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Thu Sep 29 00:42:47 2011
@@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import java.io.IOException;
 import java.security.PrivilegedAction;
-import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,6 +28,7 @@ import org.apache.hadoop.mapreduce.JobID
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -42,17 +42,12 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.AMRMProtocol;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -64,7 +59,7 @@ import org.apache.hadoop.yarn.service.Ab
 /**
  * Registers/unregisters to RM and sends heartbeats to RM.
  */
-public class RMCommunicator extends AbstractService  {
+public abstract class RMCommunicator extends AbstractService  {
   private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
   private int rmPollInterval;//millis
   protected ApplicationId applicationId;
@@ -74,7 +69,7 @@ public class RMCommunicator extends Abst
   protected EventHandler eventHandler;
   protected AMRMProtocol scheduler;
   private final ClientService clientService;
-  private int lastResponseID;
+  protected int lastResponseID;
   private Resource minContainerCapability;
   private Resource maxContainerCapability;
 
@@ -121,6 +116,34 @@ public class RMCommunicator extends Abst
     return job;
   }
 
+  /**
+   * Get the appProgress. Can be used only after this component is started.
+   * @return the appProgress.
+   */
+  protected float getApplicationProgress() {
+    // For now just a single job. In future when we have a DAG, we need an
+    // aggregate progress.
+    JobReport report = this.job.getReport();
+    float setupWeight = 0.05f;
+    float cleanupWeight = 0.05f;
+    float mapWeight = 0.0f;
+    float reduceWeight = 0.0f;
+    int numMaps = this.job.getTotalMaps();
+    int numReduces = this.job.getTotalReduces();
+    if (numMaps == 0 && numReduces == 0) {
+    } else if (numMaps == 0) {
+      reduceWeight = 0.9f;
+    } else if (numReduces == 0) {
+      mapWeight = 0.9f;
+    } else {
+      mapWeight = reduceWeight = 0.45f;
+    }
+    return (report.getSetupProgress() * setupWeight
+        + report.getCleanupProgress() * cleanupWeight
+        + report.getMapProgress() * mapWeight + report.getReduceProgress()
+        * reduceWeight);
+  }
+
   protected void register() {
     //Register
     String host = 
@@ -262,18 +285,5 @@ public class RMCommunicator extends Abst
     });
   }
 
-  protected synchronized void heartbeat() throws Exception {
-    AllocateRequest allocateRequest =
-        recordFactory.newRecordInstance(AllocateRequest.class);
-    allocateRequest.setApplicationAttemptId(applicationAttemptId);
-    allocateRequest.setResponseId(lastResponseID);
-    allocateRequest.addAllAsks(new ArrayList<ResourceRequest>());
-    allocateRequest.addAllReleases(new ArrayList<ContainerId>());
-    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
-    AMResponse response = allocateResponse.getAMResponse();
-    if (response.getReboot()) {
-      LOG.info("Event from RM: shutting down Application Master");
-    }
-  }
-
+  protected abstract void heartbeat() throws Exception;
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Sep 29 00:42:47 2011
@@ -586,37 +586,21 @@ public class RMContainerAllocator extend
     private ContainerRequest assign(Container allocated) {
       ContainerRequest assigned = null;
       
-      if (mapResourceReqt != reduceResourceReqt) {
-        //assign based on size
-        LOG.info("Assigning based on container size");
-        if (allocated.getResource().getMemory() == mapResourceReqt) {
-          assigned = assignToFailedMap(allocated);
-          if (assigned == null) {
-            assigned = assignToMap(allocated);
-          }
-        } else if (allocated.getResource().getMemory() == reduceResourceReqt) {
-          assigned = assignToReduce(allocated);
-        }
-        
-        return assigned;
-      }
-      
-      //container can be given to either map or reduce
-      //assign based on priority
-      
-      //try to assign to earlierFailedMaps if present
-      assigned = assignToFailedMap(allocated);
-      
-      //Assign to reduces before assigning to maps ?
-      if (assigned == null) {
+      Priority priority = allocated.getPriority();
+      if (PRIORITY_FAST_FAIL_MAP.equals(priority)) {
+        LOG.info("Assigning container " + allocated + " to fast fail map");
+        assigned = assignToFailedMap(allocated);
+      } else if (PRIORITY_REDUCE.equals(priority)) {
+        LOG.info("Assigning container " + allocated + " to reduce");
         assigned = assignToReduce(allocated);
-      }
-      
-      //try to assign to maps if present
-      if (assigned == null) {
+      } else if (PRIORITY_MAP.equals(priority)) {
+        LOG.info("Assigning container " + allocated + " to map");
         assigned = assignToMap(allocated);
+      } else {
+        LOG.warn("Container allocated at unwanted priority: " + priority + 
+            ". Returning to RM...");
       }
-      
+        
       return assigned;
     }
     

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Thu Sep 29 00:42:47 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 
 /**
  * Keeps the data structures to send container requests to RM.
@@ -107,15 +108,11 @@ public abstract class RMContainerRequest
     LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
   }
 
-  protected abstract void heartbeat() throws Exception;
-
   protected AMResponse makeRemoteRequest() throws YarnRemoteException {
-    AllocateRequest allocateRequest = recordFactory
-        .newRecordInstance(AllocateRequest.class);
-    allocateRequest.setApplicationAttemptId(applicationAttemptId);
-    allocateRequest.setResponseId(lastResponseID);
-    allocateRequest.addAllAsks(new ArrayList<ResourceRequest>(ask));
-    allocateRequest.addAllReleases(new ArrayList<ContainerId>(release));
+    AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
+        applicationAttemptId, lastResponseID, super.getApplicationProgress(),
+        new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(
+            release));
     AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
     AMResponse response = allocateResponse.getAMResponse();
     lastResponseID = response.getResponseId();

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java Thu Sep 29 00:42:47 2011
@@ -35,7 +35,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.v2.MRConstants;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -87,7 +86,7 @@ public class DefaultSpeculator extends A
   private final ConcurrentMap<JobId, AtomicInteger> reduceContainerNeeds
       = new ConcurrentHashMap<JobId, AtomicInteger>();
 
-  private final Set<TaskId> mayHaveSpeculated = new HashSet();
+  private final Set<TaskId> mayHaveSpeculated = new HashSet<TaskId>();
 
   private final Configuration conf;
   private AppContext context;

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobConfPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobConfPage.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobConfPage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobConfPage.java Thu Sep 29 00:42:47 2011
@@ -44,6 +44,7 @@ public class JobConfPage extends AppView
     set(TITLE, jobID.isEmpty() ? "Bad request: missing job ID"
         : join("Configuration for MapReduce Job ", $(JOB_ID)));
     commonPreHead(html);
+    set(initID(ACCORDION, "nav"), "{autoHeight:false, active:2}");
     set(DATATABLES_ID, "conf");
     set(initID(DATATABLES, "conf"), confTableInit());
     set(postInitID(DATATABLES, "conf"), confPostTableInit());

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java Thu Sep 29 00:42:47 2011
@@ -38,9 +38,9 @@ public class NavBlock extends HtmlBlock 
       div("#nav").
         h3("Cluster").
         ul().
-          li().a(url(rmweb, prefix(), "cluster"), "About")._().
-          li().a(url(rmweb, prefix(), "apps"), "Applications")._().
-          li().a(url(rmweb, prefix(), "scheduler"), "Scheduler")._()._().
+          li().a(url(rmweb, "cluster", "cluster"), "About")._().
+          li().a(url(rmweb, "cluster", "apps"), "Applications")._().
+          li().a(url(rmweb, "cluster", "scheduler"), "Scheduler")._()._().
         h3("Application").
         ul().
           li().a(url("app/info"), "About")._().

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java Thu Sep 29 00:42:47 2011
@@ -85,7 +85,7 @@ public class TaskPage extends AppView {
         if (containerId != null) {
           String containerIdStr = ConverterUtils.toString(containerId);
           nodeTd._(" ").
-            a(".logslink", url("http://", nodeHttpAddr, "yarn", "containerlogs",
+            a(".logslink", url("http://", nodeHttpAddr, "node", "containerlogs",
               containerIdStr), "logs");
         }
         nodeTd._().

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Thu Sep 29 00:42:47 2011
@@ -66,6 +66,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -91,7 +92,7 @@ public class MRApp extends MRAppMaster {
   private File testWorkDir;
   private Path testAbsPath;
 
-  private final RecordFactory recordFactory =
+  private static final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
 
   //if true, tasks complete automatically as soon as they are launched
@@ -100,7 +101,7 @@ public class MRApp extends MRAppMaster {
   static ApplicationId applicationId;
 
   static {
-    applicationId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
+    applicationId = recordFactory.newRecordInstance(ApplicationId.class);
     applicationId.setClusterTimestamp(0);
     applicationId.setId(0);
   }
@@ -108,9 +109,19 @@ public class MRApp extends MRAppMaster {
   public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart) {
     this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
   }
+  
+  private static ApplicationAttemptId getApplicationAttemptId(
+      ApplicationId applicationId, int startCount) {
+    ApplicationAttemptId applicationAttemptId =
+        recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    applicationAttemptId.setApplicationId(applicationId);
+    applicationAttemptId.setAttemptId(startCount);
+    return applicationAttemptId;
+  }
 
-  public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) {
-    super(applicationId, startCount);
+  public MRApp(int maps, int reduces, boolean autoComplete, String testName, 
+      boolean cleanOnStart, int startCount) {
+    super(getApplicationAttemptId(applicationId, startCount));
     this.testWorkDir = new File("target", testName);
     testAbsPath = new Path(testWorkDir.getAbsolutePath());
     LOG.info("PathUsed: " + testAbsPath);
@@ -391,11 +402,12 @@ public class MRApp extends MRAppMaster {
       return localStateMachine;
     }
 
-    public TestJob(Configuration conf, ApplicationId appID,
+    public TestJob(Configuration conf, ApplicationId applicationId,
         EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
         Clock clock, String user) {
-      super(appID, conf, eventHandler, taskAttemptListener,
-          new JobTokenSecretManager(), new Credentials(), clock, getStartCount(), 
+      super(getApplicationAttemptId(applicationId, getStartCount()), 
+          conf, eventHandler, taskAttemptListener,
+          new JobTokenSecretManager(), new Credentials(), clock, 
           getCompletedTaskFromPreviousRun(), metrics, user);
 
       // This "this leak" is okay because the retained pointer is in an

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Thu Sep 29 00:42:47 2011
@@ -18,12 +18,15 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import junit.framework.Assert;
@@ -32,475 +35,651 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationMaster;
-import org.apache.hadoop.yarn.api.records.ApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
-import org.junit.BeforeClass;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.After;
 import org.junit.Test;
 
 public class TestRMContainerAllocator {
-//  private static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class);
-//  private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-//
-//  @BeforeClass
-//  public static void preTests() {
-//    DefaultMetricsSystem.shutdown();
-//  }
-//
-//  @Test
-//  public void testSimple() throws Exception {
-//    FifoScheduler scheduler = createScheduler();
-//    LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
-//        scheduler, new Configuration());
-//
-//    //add resources to scheduler
-//    RMNode nodeManager1 = addNode(scheduler, "h1", 10240);
-//    RMNode nodeManager2 = addNode(scheduler, "h2", 10240);
-//    RMNode nodeManager3 = addNode(scheduler, "h3", 10240);
-//
-//    //create the container request
-//    ContainerRequestEvent event1 = 
-//      createReq(1, 1024, new String[]{"h1"});
-//    allocator.sendRequest(event1);
-//
-//    //send 1 more request with different resource req
-//    ContainerRequestEvent event2 = createReq(2, 1024, new String[]{"h2"});
-//    allocator.sendRequest(event2);
-//
-//    //this tells the scheduler about the requests
-//    //as nodes are not added, no allocations
-//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-//
-//    //send another request with different resource and priority
-//    ContainerRequestEvent event3 = createReq(3, 1024, new String[]{"h3"});
-//    allocator.sendRequest(event3);
-//
-//    //this tells the scheduler about the requests
-//    //as nodes are not added, no allocations
-//    assigned = allocator.schedule();
-//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-//
-//    //update resources in scheduler
-//    scheduler.nodeUpdate(nodeManager1); // Node heartbeat
-//    scheduler.nodeUpdate(nodeManager2); // Node heartbeat
-//    scheduler.nodeUpdate(nodeManager3); // Node heartbeat
-//
-//
-//    assigned = allocator.schedule();
-//    checkAssignments(
-//        new ContainerRequestEvent[]{event1, event2, event3}, assigned, false);
-//  }
-//
-//  //TODO: Currently Scheduler seems to have bug where it does not work
-//  //for Application asking for containers with different capabilities.
-//  //@Test
-//  public void testResource() throws Exception {
-//    FifoScheduler scheduler = createScheduler();
-//    LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
-//        scheduler, new Configuration());
-//
-//    //add resources to scheduler
-//    RMNode nodeManager1 = addNode(scheduler, "h1", 10240);
-//    RMNode nodeManager2 = addNode(scheduler, "h2", 10240);
-//    RMNode nodeManager3 = addNode(scheduler, "h3", 10240);
-//
-//    //create the container request
-//    ContainerRequestEvent event1 = 
-//      createReq(1, 1024, new String[]{"h1"});
-//    allocator.sendRequest(event1);
-//
-//    //send 1 more request with different resource req
-//    ContainerRequestEvent event2 = createReq(2, 2048, new String[]{"h2"});
-//    allocator.sendRequest(event2);
-//
-//    //this tells the scheduler about the requests
-//    //as nodes are not added, no allocations
-//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-//
-//    //update resources in scheduler
-//    scheduler.nodeUpdate(nodeManager1); // Node heartbeat
-//    scheduler.nodeUpdate(nodeManager2); // Node heartbeat
-//    scheduler.nodeUpdate(nodeManager3); // Node heartbeat
-//
-//    assigned = allocator.schedule();
-//    checkAssignments(
-//        new ContainerRequestEvent[]{event1, event2}, assigned, false);
-//  }
-//
-//  @Test
-//  public void testMapReduceScheduling() throws Exception {
-//    FifoScheduler scheduler = createScheduler();
-//    Configuration conf = new Configuration();
-//    LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
-//        scheduler, conf);
-//
-//    //add resources to scheduler
-//    RMNode nodeManager1 = addNode(scheduler, "h1", 1024);
-//    RMNode nodeManager2 = addNode(scheduler, "h2", 10240);
-//    RMNode nodeManager3 = addNode(scheduler, "h3", 10240);
-//
-//    //create the container request
-//    //send MAP request
-//    ContainerRequestEvent event1 = 
-//      createReq(1, 2048, new String[]{"h1", "h2"}, true, false);
-//    allocator.sendRequest(event1);
-//
-//    //send REDUCE request
-//    ContainerRequestEvent event2 = createReq(2, 3000, new String[]{"h1"}, false, true);
-//    allocator.sendRequest(event2);
-//
-//    //send MAP request
-//    ContainerRequestEvent event3 = createReq(3, 2048, new String[]{"h3"}, false, false);
-//    allocator.sendRequest(event3);
-//
-//    //this tells the scheduler about the requests
-//    //as nodes are not added, no allocations
-//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-//
-//    //update resources in scheduler
-//    scheduler.nodeUpdate(nodeManager1); // Node heartbeat
-//    scheduler.nodeUpdate(nodeManager2); // Node heartbeat
-//    scheduler.nodeUpdate(nodeManager3); // Node heartbeat
-//
-//    assigned = allocator.schedule();
-//    checkAssignments(
-//        new ContainerRequestEvent[]{event1, event3}, assigned, false);
-//
-//    //validate that no container is assigned to h1 as it doesn't have 2048
-//    for (TaskAttemptContainerAssignedEvent assig : assigned) {
-//      Assert.assertFalse("Assigned count not correct", 
-//          "h1".equals(assig.getContainer().getNodeId().getHost()));
-//    }
-//  }
-//
-//
-//
-//  private RMNode addNode(FifoScheduler scheduler, 
-//      String nodeName, int memory) {
-//    NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
-//    nodeId.setHost(nodeName);
-//    nodeId.setPort(1234);
-//    Resource resource = recordFactory.newRecordInstance(Resource.class);
-//    resource.setMemory(memory);
-//    RMNode nodeManager = new RMNodeImpl(nodeId, null, nodeName, 0, 0,
-//        ResourceTrackerService.resolve(nodeName), resource);
-//    scheduler.addNode(nodeManager); // Node registration
-//    return nodeManager;
-//  }
-//
-//  private FifoScheduler createScheduler() throws YarnRemoteException {
-//    FifoScheduler fsc = new FifoScheduler() {
-//      //override this to copy the objects
-//      //otherwise FifoScheduler updates the numContainers in same objects as kept by
-//      //RMContainerAllocator
-//      
-//      @Override
-//      public synchronized void allocate(ApplicationAttemptId applicationId,
-//          List<ResourceRequest> ask) {
-//        List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
-//        for (ResourceRequest req : ask) {
-//          ResourceRequest reqCopy = recordFactory.newRecordInstance(ResourceRequest.class);
-//          reqCopy.setPriority(req.getPriority());
-//          reqCopy.setHostName(req.getHostName());
-//          reqCopy.setCapability(req.getCapability());
-//          reqCopy.setNumContainers(req.getNumContainers());
-//          askCopy.add(reqCopy);
-//        }
-//        super.allocate(applicationId, askCopy);
-//      }
-//    };
-//    try {
-//      fsc.reinitialize(new Configuration(), new ContainerTokenSecretManager(), null);
-//      fsc.addApplication(recordFactory.newRecordInstance(ApplicationId.class),
-//          recordFactory.newRecordInstance(ApplicationMaster.class),
-//          "test", null, null, StoreFactory.createVoidAppStore());
-//    } catch(IOException ie) {
-//      LOG.info("add application failed with ", ie);
-//      assert(false);
-//    }
-//    return fsc;
-//  }
-//
-//  private ContainerRequestEvent createReq(
-//      int attemptid, int memory, String[] hosts) {
-//    return createReq(attemptid, memory, hosts, false, false);
-//  }
-//  
-//  private ContainerRequestEvent createReq(
-//      int attemptid, int memory, String[] hosts, boolean earlierFailedAttempt, boolean reduce) {
-//    ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
-//    appId.setClusterTimestamp(0);
-//    appId.setId(0);
-//    JobId jobId = recordFactory.newRecordInstance(JobId.class);
-//    jobId.setAppId(appId);
-//    jobId.setId(0);
-//    TaskId taskId = recordFactory.newRecordInstance(TaskId.class);
-//    taskId.setId(0);
-//    taskId.setJobId(jobId);
-//    if (reduce) {
-//      taskId.setTaskType(TaskType.REDUCE);
-//    } else {
-//      taskId.setTaskType(TaskType.MAP);
-//    }
-//    TaskAttemptId attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
-//    attemptId.setId(attemptid);
-//    attemptId.setTaskId(taskId);
-//    Resource containerNeed = recordFactory.newRecordInstance(Resource.class);
-//    containerNeed.setMemory(memory);
-//    if (earlierFailedAttempt) {
-//      return ContainerRequestEvent.
-//           createContainerRequestEventForFailedContainer(attemptId, containerNeed);
-//    }
-//    return new ContainerRequestEvent(attemptId, 
-//        containerNeed, 
-//        hosts, new String[] {NetworkTopology.DEFAULT_RACK});
-//  }
-//
-//  private void checkAssignments(ContainerRequestEvent[] requests, 
-//      List<TaskAttemptContainerAssignedEvent> assignments, 
-//      boolean checkHostMatch) {
-//    Assert.assertNotNull("Container not assigned", assignments);
-//    Assert.assertEquals("Assigned count not correct", 
-//        requests.length, assignments.size());
-//
-//    //check for uniqueness of containerIDs
-//    Set<ContainerId> containerIds = new HashSet<ContainerId>();
-//    for (TaskAttemptContainerAssignedEvent assigned : assignments) {
-//      containerIds.add(assigned.getContainer().getId());
-//    }
-//    Assert.assertEquals("Assigned containers must be different", 
-//        assignments.size(), containerIds.size());
-//
-//    //check for all assignment
-//    for (ContainerRequestEvent req : requests) {
-//      TaskAttemptContainerAssignedEvent assigned = null;
-//      for (TaskAttemptContainerAssignedEvent ass : assignments) {
-//        if (ass.getTaskAttemptID().equals(req.getAttemptID())){
-//          assigned = ass;
-//          break;
-//        }
-//      }
-//      checkAssignment(req, assigned, checkHostMatch);
-//    }
-//  }
-//
-//  private void checkAssignment(ContainerRequestEvent request, 
-//      TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) {
-//    Assert.assertNotNull("Nothing assigned to attempt " + request.getAttemptID(),
-//        assigned);
-//    Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(),
-//        assigned.getTaskAttemptID());
-//    if (checkHostMatch) {
-//      Assert.assertTrue("Not assigned to requested host", Arrays.asList(
-//          request.getHosts()).contains(
-//          assigned.getContainer().getNodeId().toString()));
-//    }
-//
-//  }
-//
-//  //Mock RMContainerAllocator
-//  //Instead of talking to remote Scheduler,uses the local Scheduler
-//  public static class LocalRMContainerAllocator extends RMContainerAllocator {
-//    private static final List<TaskAttemptContainerAssignedEvent> events = 
-//      new ArrayList<TaskAttemptContainerAssignedEvent>();
-//
-//    public static class AMRMProtocolImpl implements AMRMProtocol {
-//
-//      private ResourceScheduler resourceScheduler;
-//
-//      public AMRMProtocolImpl(ResourceScheduler resourceScheduler) {
-//        this.resourceScheduler = resourceScheduler;
-//      }
-//
-//      @Override
-//      public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnRemoteException {
-//        RegisterApplicationMasterResponse response = recordFactory.newRecordInstance(RegisterApplicationMasterResponse.class);
-//        return response;
-//      }
-//
-//      public AllocateResponse allocate(AllocateRequest request) throws YarnRemoteException {
-//        List<ResourceRequest> ask = request.getAskList();
-//        List<Container> release = request.getReleaseList();
-//        try {
-//          AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
-//          Allocation allocation = resourceScheduler.allocate(request.getApplicationAttemptId(), ask);
-//          response.addAllNewContainers(allocation.getContainers());
-//          response.setAvailableResources(allocation.getResourceLimit());
-//          AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class);
-//          allocateResponse.setAMResponse(response);
-//          return allocateResponse;
-//        } catch(IOException ie) {
-//          throw RPCUtil.getRemoteException(ie);
-//        }
-//      }
-//
-//      @Override
-//      public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnRemoteException {
-//        FinishApplicationMasterResponse response = recordFactory.newRecordInstance(FinishApplicationMasterResponse.class);
-//        return response;
-//      }
-//
-//    }
-//
-//    private ResourceScheduler scheduler;
-//    LocalRMContainerAllocator(ResourceScheduler scheduler, Configuration conf) {
-//      super(null, new TestContext(events));
-//      this.scheduler = scheduler;
-//      super.init(conf);
-//      super.start();
-//    }
-//
-//    protected AMRMProtocol createSchedulerProxy() {
-//      return new AMRMProtocolImpl(scheduler);
-//    }
-//
-//    @Override
-//    protected void register() {}
-//    @Override
-//    protected void unregister() {}
-//
-//    @Override
-//    protected Resource getMinContainerCapability() {
-//      Resource res = recordFactory.newRecordInstance(Resource.class);
-//      res.setMemory(1024);
-//      return res;
-//    }
-//    
-//    @Override
-//    protected Resource getMaxContainerCapability() {
-//      Resource res = recordFactory.newRecordInstance(Resource.class);
-//      res.setMemory(10240);
-//      return res;
-//    }
-//    
-//    public void sendRequest(ContainerRequestEvent req) {
-//      sendRequests(Arrays.asList(new ContainerRequestEvent[]{req}));
-//    }
-//
-//    public void sendRequests(List<ContainerRequestEvent> reqs) {
-//      for (ContainerRequestEvent req : reqs) {
-//        handle(req);
-//      }
-//    }
-//
-//    //API to be used by tests
-//    public List<TaskAttemptContainerAssignedEvent> schedule() {
-//      //run the scheduler
-//      try {
-//        heartbeat();
-//      } catch (Exception e) {
-//        LOG.error("error in heartbeat ", e);
-//        throw new YarnException(e);
-//      }
-//
-//      List<TaskAttemptContainerAssignedEvent> result = new ArrayList(events);
-//      events.clear();
-//      return result;
-//    }
-//
-//    protected void startAllocatorThread() {
-//      //override to NOT start thread
-//    }
-//
-//    static class TestContext implements AppContext {
-//      private List<TaskAttemptContainerAssignedEvent> events;
-//      TestContext(List<TaskAttemptContainerAssignedEvent> events) {
-//        this.events = events;
-//      }
-//      @Override
-//      public Map<JobId, Job> getAllJobs() {
-//        return null;
-//      }
-//      @Override
-//      public ApplicationAttemptId getApplicationAttemptId() {
-//        return recordFactory.newRecordInstance(ApplicationAttemptId.class);
-//      }
-//      @Override
-//      public ApplicationId getApplicationID() {
-//        return recordFactory.newRecordInstance(ApplicationId.class);
-//      }
-//      @Override
-//      public EventHandler getEventHandler() {
-//        return new EventHandler() {
-//          @Override
-//          public void handle(Event event) {
-//            events.add((TaskAttemptContainerAssignedEvent) event);
-//          }
-//        };
-//      }
-//      @Override
-//      public Job getJob(JobId jobID) {
-//        return null;
-//      }
-//
-//      @Override
-//      public String getUser() {
-//        return null;
-//      }
-//
-//      @Override
-//      public Clock getClock() {
-//        return null;
-//      }
-//
-//      @Override
-//      public String getApplicationName() {
-//        return null;
-//      }
-//
-//      @Override
-//      public long getStartTime() {
-//        return 0;
-//      }
-//    }
-//  }
-//
-//  public static void main(String[] args) throws Exception {
-//    TestRMContainerAllocator t = new TestRMContainerAllocator();
-//    t.testSimple();
-//    //t.testResource();
-//    t.testMapReduceScheduling();
-//  }
+
+  static final Log LOG = LogFactory
+      .getLog(TestRMContainerAllocator.class);
+  static final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  @After
+  public void tearDown() {
+    DefaultMetricsSystem.shutdown();
+  }
+
+  @Test
+  public void testSimple() throws Exception {
+
+    LOG.info("Running testSimple");
+
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
+            0, 0, 0, 0, 0, 0));
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+
+    // add resources to scheduler
+    MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
+    MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+    MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+    dispatcher.await();
+
+    // create the container request
+    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+        new String[] { "h1" });
+    allocator.sendRequest(event1);
+
+    // send 1 more request with different resource req
+    ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
+        new String[] { "h2" });
+    allocator.sendRequest(event2);
+
+    // this tells the scheduler about the requests
+    // as nodes are not added, no allocations
+    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    // send another request with different resource and priority
+    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
+        new String[] { "h3" });
+    allocator.sendRequest(event3);
+
+    // this tells the scheduler about the requests
+    // as nodes are not added, no allocations
+    assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    // update resources in scheduler
+    nodeManager1.nodeHeartbeat(true); // Node heartbeat
+    nodeManager2.nodeHeartbeat(true); // Node heartbeat
+    nodeManager3.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    assigned = allocator.schedule();
+    dispatcher.await();
+    checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
+        assigned, false);
+  }
+
+  @Test
+  public void testResource() throws Exception {
+
+    LOG.info("Running testResource");
+
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
+            0, 0, 0, 0, 0, 0));
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+
+    // add resources to scheduler
+    MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
+    MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+    MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+    dispatcher.await();
+
+    // create the container request
+    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+        new String[] { "h1" });
+    allocator.sendRequest(event1);
+
+    // send 1 more request with different resource req
+    ContainerRequestEvent event2 = createReq(jobId, 2, 2048,
+        new String[] { "h2" });
+    allocator.sendRequest(event2);
+
+    // this tells the scheduler about the requests
+    // as nodes are not added, no allocations
+    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    // update resources in scheduler
+    nodeManager1.nodeHeartbeat(true); // Node heartbeat
+    nodeManager2.nodeHeartbeat(true); // Node heartbeat
+    nodeManager3.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    assigned = allocator.schedule();
+    dispatcher.await();
+    checkAssignments(new ContainerRequestEvent[] { event1, event2 },
+        assigned, false);
+  }
+
+  @Test
+  public void testMapReduceScheduling() throws Exception {
+
+    LOG.info("Running testMapReduceScheduling");
+
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
+            0, 0, 0, 0, 0, 0));
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+
+    // add resources to scheduler
+    MockNM nodeManager1 = rm.registerNode("h1:1234", 1024);
+    MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+    MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+    dispatcher.await();
+
+    // create the container request
+    // send MAP request
+    ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] {
+        "h1", "h2" }, true, false);
+    allocator.sendRequest(event1);
+
+    // send REDUCE request
+    ContainerRequestEvent event2 = createReq(jobId, 2, 3000,
+        new String[] { "h1" }, false, true);
+    allocator.sendRequest(event2);
+
+    // send MAP request
+    ContainerRequestEvent event3 = createReq(jobId, 3, 2048,
+        new String[] { "h3" }, false, false);
+    allocator.sendRequest(event3);
+
+    // this tells the scheduler about the requests
+    // as nodes are not added, no allocations
+    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    // update resources in scheduler
+    nodeManager1.nodeHeartbeat(true); // Node heartbeat
+    nodeManager2.nodeHeartbeat(true); // Node heartbeat
+    nodeManager3.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    assigned = allocator.schedule();
+    dispatcher.await();
+    checkAssignments(new ContainerRequestEvent[] { event1, event3 },
+        assigned, false);
+
+    // validate that no container is assigned to h1 as it doesn't have 2048
+    for (TaskAttemptContainerAssignedEvent assig : assigned) {
+      Assert.assertFalse("Assigned count not correct", "h1".equals(assig
+          .getContainer().getNodeId().getHost()));
+    }
+  }
+
+  private static class MyResourceManager extends MockRM {
+
+    public MyResourceManager(Configuration conf) {
+      super(conf);
+    }
+
+    @Override
+    protected Dispatcher createDispatcher() {
+      return new DrainDispatcher();
+    }
+
+    @Override
+    protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
+      // Dispatch inline for test sanity
+      return new EventHandler<SchedulerEvent>() {
+        @Override
+        public void handle(SchedulerEvent event) {
+          scheduler.handle(event);
+        }
+      };
+    }
+    @Override
+    protected ResourceScheduler createScheduler() {
+      return new MyFifoScheduler(getRMContext());
+    }
+  }
+
+  private static class FakeJob extends JobImpl {
+
+    public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf,
+        int numMaps, int numReduces) {
+      super(appAttemptID, conf, null, null, null, null, null, null, null,
+          null);
+      this.jobId = MRBuilderUtils
+          .newJobId(appAttemptID.getApplicationId(), 0);
+      this.numMaps = numMaps;
+      this.numReduces = numReduces;
+    }
+
+    private float setupProgress;
+    private float mapProgress;
+    private float reduceProgress;
+    private float cleanupProgress;
+    private final int numMaps;
+    private final int numReduces;
+    private JobId jobId;
+
+    void setProgress(float setupProgress, float mapProgress,
+        float reduceProgress, float cleanupProgress) {
+      this.setupProgress = setupProgress;
+      this.mapProgress = mapProgress;
+      this.reduceProgress = reduceProgress;
+      this.cleanupProgress = cleanupProgress;
+    }
+
+    @Override
+    public int getTotalMaps() { return this.numMaps; }
+    @Override
+    public int getTotalReduces() { return this.numReduces;}
+
+    @Override
+    public JobReport getReport() {
+      return MRBuilderUtils.newJobReport(this.jobId, "job", "user",
+          JobState.RUNNING, 0, 0, this.setupProgress, this.mapProgress,
+          this.reduceProgress, this.cleanupProgress);
+    }
+  }
+
+  @Test
+  public void testReportedAppProgress() throws Exception {
+
+    LOG.info("Running testReportedAppProgress");
+
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    FakeJob job = new FakeJob(appAttemptId, conf, 2, 2);
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, job);
+
+    allocator.schedule(); // Send heartbeat
+    dispatcher.await();
+    Assert.assertEquals(0.0, app.getProgress(), 0.0);
+
+    job.setProgress(100, 10, 0, 0);
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(9.5f, app.getProgress(), 0.0);
+
+    job.setProgress(100, 80, 0, 0);
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(41.0f, app.getProgress(), 0.0);
+
+    job.setProgress(100, 100, 20, 0);
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(59.0f, app.getProgress(), 0.0);
+
+    job.setProgress(100, 100, 100, 100);
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(100.0f, app.getProgress(), 0.0);
+  }
+
+  @Test
+  public void testReportedAppProgressWithOnlyMaps() throws Exception {
+
+    LOG.info("Running testReportedAppProgressWithOnlyMaps");
+
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    FakeJob job = new FakeJob(appAttemptId, conf, 2, 0);
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, job);
+
+    allocator.schedule(); // Send heartbeat
+    dispatcher.await();
+    Assert.assertEquals(0.0, app.getProgress(), 0.0);
+
+    job.setProgress(100, 10, 0, 0);
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(14f, app.getProgress(), 0.0);
+
+    job.setProgress(100, 60, 0, 0);
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(59.0f, app.getProgress(), 0.0);
+
+    job.setProgress(100, 100, 0, 100);
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(100.0f, app.getProgress(), 0.0);
+  }
+
+  private static class MyFifoScheduler extends FifoScheduler {
+
+    public MyFifoScheduler(RMContext rmContext) {
+      super();
+      try {
+        reinitialize(new Configuration(), new ContainerTokenSecretManager(),
+            rmContext);
+      } catch (IOException ie) {
+        LOG.info("add application failed with ", ie);
+        assert (false);
+      }
+    }
+
+    // override this to copy the objects otherwise FifoScheduler updates the
+    // numContainers in same objects as kept by RMContainerAllocator
+    @Override
+    public synchronized Allocation allocate(
+        ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
+        List<ContainerId> release) {
+      List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
+      for (ResourceRequest req : ask) {
+        ResourceRequest reqCopy = BuilderUtils.newResourceRequest(req
+            .getPriority(), req.getHostName(), req.getCapability(), req
+            .getNumContainers());
+        askCopy.add(reqCopy);
+      }
+      return super.allocate(applicationAttemptId, askCopy, release);
+    }
+  }
+
+  private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
+      int memory, String[] hosts) {
+    return createReq(jobId, taskAttemptId, memory, hosts, false, false);
+  }
+
+  private ContainerRequestEvent
+      createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts,
+          boolean earlierFailedAttempt, boolean reduce) {
+    TaskId taskId;
+    if (reduce) {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+    } else {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+    }
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
+        taskAttemptId);
+    Resource containerNeed = BuilderUtils.newResource(memory);
+    if (earlierFailedAttempt) {
+      return ContainerRequestEvent
+          .createContainerRequestEventForFailedContainer(attemptId,
+              containerNeed);
+    }
+    return new ContainerRequestEvent(attemptId, containerNeed, hosts,
+        new String[] { NetworkTopology.DEFAULT_RACK });
+  }
+
+  private void checkAssignments(ContainerRequestEvent[] requests,
+      List<TaskAttemptContainerAssignedEvent> assignments,
+      boolean checkHostMatch) {
+    Assert.assertNotNull("Container not assigned", assignments);
+    Assert.assertEquals("Assigned count not correct", requests.length,
+        assignments.size());
+
+    // check for uniqueness of containerIDs
+    Set<ContainerId> containerIds = new HashSet<ContainerId>();
+    for (TaskAttemptContainerAssignedEvent assigned : assignments) {
+      containerIds.add(assigned.getContainer().getId());
+    }
+    Assert.assertEquals("Assigned containers must be different", assignments
+        .size(), containerIds.size());
+
+    // check for all assignment
+    for (ContainerRequestEvent req : requests) {
+      TaskAttemptContainerAssignedEvent assigned = null;
+      for (TaskAttemptContainerAssignedEvent ass : assignments) {
+        if (ass.getTaskAttemptID().equals(req.getAttemptID())) {
+          assigned = ass;
+          break;
+        }
+      }
+      checkAssignment(req, assigned, checkHostMatch);
+    }
+  }
+
+  private void checkAssignment(ContainerRequestEvent request,
+      TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) {
+    Assert.assertNotNull("Nothing assigned to attempt "
+        + request.getAttemptID(), assigned);
+    Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(),
+        assigned.getTaskAttemptID());
+    if (checkHostMatch) {
+      Assert.assertTrue("Not assigned to requested host", Arrays.asList(
+          request.getHosts()).contains(
+          assigned.getContainer().getNodeId().toString()));
+    }
+  }
+
+  // Mock RMContainerAllocator
+  // Instead of talking to remote Scheduler,uses the local Scheduler
+  private static class MyContainerAllocator extends RMContainerAllocator {
+    static final List<TaskAttemptContainerAssignedEvent> events
+      = new ArrayList<TaskAttemptContainerAssignedEvent>();
+
+    private MyResourceManager rm;
+
+    @SuppressWarnings("rawtypes")
+    private static AppContext createAppContext(
+        ApplicationAttemptId appAttemptId, Job job) {
+      AppContext context = mock(AppContext.class);
+      ApplicationId appId = appAttemptId.getApplicationId();
+      when(context.getApplicationID()).thenReturn(appId);
+      when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
+      when(context.getJob(isA(JobId.class))).thenReturn(job);
+      when(context.getEventHandler()).thenReturn(new EventHandler() {
+        @Override
+        public void handle(Event event) {
+          // Only capture interesting events.
+          if (event instanceof TaskAttemptContainerAssignedEvent) {
+            events.add((TaskAttemptContainerAssignedEvent) event);
+          }
+        }
+      });
+      return context;
+    }
+
+    private static ClientService createMockClientService() {
+      ClientService service = mock(ClientService.class);
+      when(service.getBindAddress()).thenReturn(
+          NetUtils.createSocketAddr("localhost:4567"));
+      when(service.getHttpPort()).thenReturn(890);
+      return service;
+    }
+
+    MyContainerAllocator(MyResourceManager rm, Configuration conf,
+        ApplicationAttemptId appAttemptId, Job job) {
+      super(createMockClientService(), createAppContext(appAttemptId, job));
+      this.rm = rm;
+      super.init(conf);
+      super.start();
+    }
+
+    @Override
+    protected AMRMProtocol createSchedulerProxy() {
+      return this.rm.getApplicationMasterService();
+    }
+
+    @Override
+    protected void register() {
+      super.register();
+    }
+
+    @Override
+    protected void unregister() {
+    }
+
+    @Override
+    protected Resource getMinContainerCapability() {
+      return BuilderUtils.newResource(1024);
+    }
+
+    @Override
+    protected Resource getMaxContainerCapability() {
+      return BuilderUtils.newResource(10240);
+    }
+
+    public void sendRequest(ContainerRequestEvent req) {
+      sendRequests(Arrays.asList(new ContainerRequestEvent[] { req }));
+    }
+
+    public void sendRequests(List<ContainerRequestEvent> reqs) {
+      for (ContainerRequestEvent req : reqs) {
+        super.handle(req);
+      }
+    }
+
+    // API to be used by tests
+    public List<TaskAttemptContainerAssignedEvent> schedule() {
+      // run the scheduler
+      try {
+        super.heartbeat();
+      } catch (Exception e) {
+        LOG.error("error in heartbeat ", e);
+        throw new YarnException(e);
+      }
+
+      List<TaskAttemptContainerAssignedEvent> result
+        = new ArrayList<TaskAttemptContainerAssignedEvent>(events);
+      events.clear();
+      return result;
+    }
+
+    protected void startAllocatorThread() {
+      // override to NOT start thread
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestRMContainerAllocator t = new TestRMContainerAllocator();
+    t.testSimple();
+    t.testResource();
+    t.testMapReduceScheduling();
+    t.testReportedAppProgress();
+    t.testReportedAppProgressWithOnlyMaps();
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Thu Sep 29 00:42:47 2011
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -280,15 +281,27 @@ public class TypeConverter {
   }
   
   public static org.apache.hadoop.mapred.JobStatus fromYarn(
-      JobReport jobreport, String jobFile, String trackingUrl) {
+      JobReport jobreport, String jobFile) {
     JobPriority jobPriority = JobPriority.NORMAL;
-    return new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.getJobId()),
-        jobreport.getSetupProgress(), jobreport.getMapProgress(),
-        jobreport.getReduceProgress(), jobreport.getCleanupProgress(),
-        fromYarn(jobreport.getJobState()),
-        jobPriority, jobreport.getUser(), jobreport.getJobName(),
-        jobFile, trackingUrl);
+    org.apache.hadoop.mapred.JobStatus jobStatus =
+        new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.getJobId()),
+            jobreport.getSetupProgress(), jobreport.getMapProgress(),
+            jobreport.getReduceProgress(), jobreport.getCleanupProgress(),
+            fromYarn(jobreport.getJobState()),
+            jobPriority, jobreport.getUser(), jobreport.getJobName(),
+            jobFile, jobreport.getTrackingUrl());
+    jobStatus.setFailureInfo(jobreport.getDiagnostics());
+    return jobStatus;
+  }
+  
+  public static org.apache.hadoop.mapreduce.QueueState fromYarn(
+      QueueState state) {
+    org.apache.hadoop.mapreduce.QueueState qState = 
+      org.apache.hadoop.mapreduce.QueueState.getState(
+        state.toString().toLowerCase());
+    return qState;
   }
+
   
   public static int fromYarn(JobState state) {
     switch (state) {
@@ -412,6 +425,7 @@ public class TypeConverter {
       );
     jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url
     jobStatus.setStartTime(application.getStartTime());
+    jobStatus.setFailureInfo(application.getDiagnostics());
     return jobStatus;
   }
 
@@ -431,9 +445,9 @@ public class TypeConverter {
   
   public static QueueInfo fromYarn(org.apache.hadoop.yarn.api.records.QueueInfo 
       queueInfo, Configuration conf) {
-    return new QueueInfo(queueInfo.getQueueName(), 
-        queueInfo.toString(), QueueState.RUNNING, 
-        TypeConverter.fromYarnApps(queueInfo.getApplications(), conf));
+    return new QueueInfo(queueInfo.getQueueName(),queueInfo.toString(),
+        fromYarn(queueInfo.getQueueState()), TypeConverter.fromYarnApps(
+        queueInfo.getApplications(), conf));
   }
   
   public static QueueInfo[] fromYarnQueueInfo(

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java Thu Sep 29 00:42:47 2011
@@ -29,6 +29,8 @@ public interface JobReport {
   public abstract long getFinishTime();
   public abstract String getUser();
   public abstract String getJobName();
+  public abstract String getTrackingUrl();
+  public abstract String getDiagnostics();
 
   public abstract void setJobId(JobId jobId);
   public abstract void setJobState(JobState jobState);
@@ -40,4 +42,6 @@ public interface JobReport {
   public abstract void setFinishTime(long finishTime);
   public abstract void setUser(String user);
   public abstract void setJobName(String jobName);
+  public abstract void setTrackingUrl(String trackingUrl);
+  public abstract void setDiagnostics(String diagnostics);
 }