You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/03/24 08:52:36 UTC

svn commit: r1084866 [1/2] - in /hadoop/mapreduce/branches/MR-279/yarn: yarn-api/src/main/avro/ yarn-common/src/main/java/org/apache/hadoop/yarn/conf/ yarn-server/yarn-server-common/src/main/resources/ yarn-server/yarn-server-resourcemanager/src/main/j...

Author: mahadev
Date: Thu Mar 24 07:52:35 2011
New Revision: 1084866

URL: http://svn.apache.org/viewvc?rev=1084866&view=rev
Log:
Restart/Cleanup an application on an AM failure for a configurable number of times contributed by Mahadev Konar

Added:
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeResponse.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
Removed:
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTracker.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTracker.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterTrackerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterTracker.java
Modified:
    hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/avro/yarn-types.genavro
    hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/ApplicationMasterEvents.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/avro/yarn-types.genavro
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/avro/yarn-types.genavro?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/avro/yarn-types.genavro (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/avro/yarn-types.genavro Thu Mar 24 07:52:35 2011
@@ -48,6 +48,7 @@ protocol types {
     PENDING,
     ALLOCATING,
    	ALLOCATED,
+   	EXPIRED_PENDING,
    	LAUNCHING,
    	LAUNCHED,
     RUNNING,

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Thu Mar 24 07:52:35 2011
@@ -31,6 +31,11 @@ public class YarnConfiguration extends C
   public static final String AM_EXPIRY_INTERVAL = RM_PREFIX
   + "application.expiry.interval";
   
+  public static final String AM_MAX_RETRIES = RM_PREFIX 
+  + "application.max.retries";
+  
+  public static final int DEFAULT_AM_MAX_RETRIES = 3;
+  
   public static final long DEFAULT_AM_EXPIRY_INTERVAL = 60000L;
 
   public static final String NM_EXPIRY_INTERVAL = RM_PREFIX 
@@ -92,7 +97,7 @@ public class YarnConfiguration extends C
       "AppClientTokenEnv";
 
   public static final String RESOURCE_SCHEDULER = RM_PREFIX + "scheduler";
-  
+ 
   public YarnConfiguration() {
     super();
   }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml Thu Mar 24 07:52:35 2011
@@ -30,6 +30,12 @@
   </property>
 
   <property>
+    <name>yarn.server.resourcemanager.application.max.retries</name>
+    <value>1</value>
+    <description>The number of times an application will be retried in case
+    of AM failure.</description>
+  </property>
+  <property>
     <name>yarn.server.resourcemanager.keytab</name>
     <value>/etc/krb5.keytab</value>
   </property>

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Thu Mar 24 07:52:35 2011
@@ -29,18 +29,18 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.ClientRMProtocol;
+import org.apache.hadoop.yarn.YarnClusterMetrics;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTracker;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ResourceContext;
 import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ApplicationMaster;
-import org.apache.hadoop.yarn.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.ClientRMProtocol;
-import org.apache.hadoop.yarn.YarnClusterMetrics;
 
 /**
  * The client interface to the Resource Manager. This module handles all the rpc
@@ -48,14 +48,14 @@ import org.apache.hadoop.yarn.YarnCluste
  */
 public class ClientRMService extends AbstractService implements ClientRMProtocol {
   private static final Log LOG = LogFactory.getLog(ClientRMService.class);
-  private RMResourceTracker clusterInfo;
+  private ResourceContext clusterInfo;
   private ApplicationsManager applicationsManager;
   private String clientServiceBindAddress;
   private Server server;
   InetSocketAddress clientBindAddress;
   
   public ClientRMService(ApplicationsManager applicationsManager, 
-        RMResourceTracker clusterInfo) {
+        ResourceContext clusterInfo) {
     super(ClientRMService.class.getName());
     this.clusterInfo = clusterInfo;
     this.applicationsManager = applicationsManager;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Thu Mar 24 07:52:35 2011
@@ -32,7 +32,9 @@ import org.apache.hadoop.yarn.security.A
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManagerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.SyncDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
@@ -101,7 +103,8 @@ public class ResourceManager extends Com
               FifoScheduler.class, ResourceScheduler.class), 
           this.conf);
     this.scheduler.reinitialize(this.conf, this.containerTokenSecretManager);
-    
+    /* add the scheduler to be notified of events from the applications managers */
+    this.asmContext.getDispatcher().register(ApplicationTrackerEventType.class, this.scheduler);
     //TODO change this to be random
     this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
         .createSecretKey("Dummy".getBytes()));
@@ -109,8 +112,7 @@ public class ResourceManager extends Com
     applicationsManager = createApplicationsManagerImpl();
     addService(applicationsManager);
     
-    rmResourceTracker = createRMResourceTracker();
-    rmResourceTracker.register(this.scheduler);
+    rmResourceTracker = createRMResourceTracker(this.scheduler);
     addService(rmResourceTracker);
     
     clientRM = createClientRMService();
@@ -165,8 +167,8 @@ public class ResourceManager extends Com
     super.stop();
   }
   
-  protected RMResourceTrackerImpl createRMResourceTracker() {
-    return new RMResourceTrackerImpl(this.containerTokenSecretManager);
+  protected RMResourceTrackerImpl createRMResourceTracker(ResourceListener listener) {
+    return new RMResourceTrackerImpl(this.containerTokenSecretManager, listener);
   }
   
   protected ApplicationsManagerImpl createApplicationsManagerImpl() {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java Thu Mar 24 07:52:35 2011
@@ -59,7 +59,9 @@ public class AMTracker extends AbstractS
   private long amExpiryInterval; 
   @SuppressWarnings("rawtypes")
   private EventHandler handler;
-
+  
+  private int amMaxRetries;
+  
   private final ASMContext asmContext;
   
   private final Map<ApplicationID, ApplicationMasterInfo> applications = 
@@ -93,6 +95,10 @@ public class AMTracker extends AbstractS
     this.handler = asmContext.getDispatcher().getEventHandler();
     this.amExpiryInterval = conf.getLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 
     YarnConfiguration.DEFAULT_AM_EXPIRY_INTERVAL);
+    LOG.info("AM expiry interval: " + this.amExpiryInterval);
+    this.amMaxRetries =  conf.getInt(YarnConfiguration.AM_MAX_RETRIES, 
+        YarnConfiguration.DEFAULT_AM_MAX_RETRIES);
+    LOG.info("AM max retries: " + this.amMaxRetries);
     this.asmContext.getDispatcher().register(ApplicationEventType.class, this);
   }
 
@@ -161,7 +167,6 @@ public class AMTracker extends AbstractS
       synchronized (applications) {
         am = applications.get(app);
       }
-     
       handler.handle(new ASMEvent<ApplicationEventType>
           (ApplicationEventType.EXPIRE, am));
       }
@@ -208,6 +213,8 @@ public class AMTracker extends AbstractS
     return masterInfo;
   }
 
+  /* As of now we dont remove applications from the RM */
+  /* TODO we need to decide on a strategy for expiring done applications */
   public void remove(ApplicationID applicationId) {
     synchronized (applications) {
       applications.remove(applicationId);
@@ -297,6 +304,11 @@ public class AMTracker extends AbstractS
     public String getQueue() {
       throw notimplemented;
     }
+
+    @Override
+    public int getFailedCount() {
+     throw notimplemented;
+    }
   }
   
   public void heartBeat(ApplicationStatus status) {
@@ -347,5 +359,17 @@ public class AMTracker extends AbstractS
         amExpiryQueue.add(masterInfo.getStatus());
       }
     }
+    
+    /* check to see if the AM is an EXPIRED_PENDING state and start off the cycle again */
+    if (masterInfo.getState() == ApplicationState.EXPIRED_PENDING) {
+      /* check to see if the number of retries are reached or not */
+      if (masterInfo.getFailedCount() < this.amMaxRetries) {
+        handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.ALLOCATE,
+          masterInfo));
+      } else {
+        handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
+              FAILED_MAX_RETRIES, masterInfo));
+      }
+    }
   }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java Thu Mar 24 07:52:35 2011
@@ -91,4 +91,10 @@ public interface AppContext {
    * @return the queue for this application
    */
   public String getQueue();
+  
+  /**
+   * the count of number of times the AM has expired/failed.
+   * @return the count of number of times the AM has expired/failed.
+   */
+  public int getFailedCount();
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java Thu Mar 24 07:52:35 2011
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -55,12 +58,18 @@ public class ApplicationMasterInfo imple
   private final EventHandler handler;
   private Container masterContainer;
   final private String user;
+  private int numFailed = 0;
+  /* the list of nodes that this AM was launched on */
+  List<String> hostNamesLaunched = new ArrayList<String>();
   /* this transition is too generalized, needs to be broken up as and when we 
    * keeping adding states. This will keep evolving and is not final yet.
    */
   private final  KillTransition killTransition =  new KillTransition();
   private final StatusUpdateTransition statusUpdatetransition = new StatusUpdateTransition();
-
+  private final ExpireTransition expireTransition = new ExpireTransition();
+  private final FailedTransition failedTransition = new FailedTransition();
+  private final AllocateTransition allocateTransition = new AllocateTransition();
+  
   private final StateMachine<ApplicationState, ApplicationEventType, 
   ASMEvent<ApplicationEventType>> stateMachine;
 
@@ -72,8 +81,11 @@ public class ApplicationMasterInfo imple
   (ApplicationState.PENDING)
 
   .addTransition(ApplicationState.PENDING, ApplicationState.ALLOCATING,
-  ApplicationEventType.ALLOCATE, new AllocateTransition())
-
+  ApplicationEventType.ALLOCATE, allocateTransition)
+  
+  .addTransition(ApplicationState.EXPIRED_PENDING, ApplicationState.ALLOCATING, 
+  ApplicationEventType.ALLOCATE, allocateTransition)
+  
   .addTransition(ApplicationState.PENDING, ApplicationState.CLEANUP, 
   ApplicationEventType.KILL, killTransition)
 
@@ -99,14 +111,23 @@ public class ApplicationMasterInfo imple
   ApplicationEventType.KILL, killTransition)
   
   .addTransition(ApplicationState.LAUNCHED, ApplicationState.FAILED,
-  ApplicationEventType.EXPIRE, new ExpireTransition())
+  ApplicationEventType.EXPIRE, expireTransition)
   
   .addTransition(ApplicationState.LAUNCHED, ApplicationState.RUNNING, 
   ApplicationEventType.REGISTERED, new RegisterTransition())
-
-  .addTransition(ApplicationState.RUNNING,  ApplicationState.FAILED, 
-  ApplicationEventType.EXPIRE, new ExpireTransition())
   
+  /* for now we assume that acting on expiry is synchronous and we do not 
+   * have to wait for cleanup acks from scheduler negotiator and launcher.
+   */
+  .addTransition(ApplicationState.LAUNCHED, ApplicationState.EXPIRED_PENDING,
+      ApplicationEventType.EXPIRE, expireTransition)
+      
+  .addTransition(ApplicationState.RUNNING,  ApplicationState.EXPIRED_PENDING, 
+  ApplicationEventType.EXPIRE, expireTransition)
+  
+  .addTransition(ApplicationState.EXPIRED_PENDING, ApplicationState.FAILED,
+      ApplicationEventType.FAILED_MAX_RETRIES, failedTransition)
+      
   .addTransition(ApplicationState.RUNNING, ApplicationState.COMPLETED,
   ApplicationEventType.FINISH, new DoneTransition())
 
@@ -186,6 +207,11 @@ public class ApplicationMasterInfo imple
   }
 
   @Override
+  public synchronized int getFailedCount() {
+    return numFailed;
+  }
+  
+  @Override
   public String getName() {
     return submissionContext.applicationName.toString();
   }
@@ -218,6 +244,8 @@ public class ApplicationMasterInfo imple
     ASMEvent<ApplicationEventType> event) {
       masterInfo.handler.handle(new ASMEvent<SNEventType>(SNEventType.CLEANUP, masterInfo));
       masterInfo.handler.handle(new ASMEvent<AMLauncherEventType>(AMLauncherEventType.CLEANUP, masterInfo));
+      masterInfo.handler.handle(new ASMEvent<ApplicationTrackerEventType>(ApplicationTrackerEventType.REMOVE,
+          masterInfo));
     }
   }
 
@@ -253,6 +281,7 @@ public class ApplicationMasterInfo imple
         AMLauncherEventType.CLEANUP, masterInfo));
       masterInfo.handler.handle(new ASMEvent<ApplicationTrackerEventType>(
       ApplicationTrackerEventType.REMOVE, masterInfo));
+      masterInfo.numFailed++;
     }
   }
 
@@ -302,16 +331,13 @@ public class ApplicationMasterInfo imple
 
   /* transition to finishing state on a cleanup, for now its not used, but will need it 
    * later */
-  private static class FinishTransition implements 
+  private static class FailedTransition implements 
   SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
 
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
     ASMEvent<ApplicationEventType> event) {
-      masterInfo.handler.handle(new ASMEvent<SNEventType>(
-        SNEventType.CLEANUP, masterInfo));
-      masterInfo.handler.handle(new ASMEvent<AMLauncherEventType>(
-        AMLauncherEventType.CLEANUP, masterInfo));
+      LOG.info("Failed application: " + masterInfo.getApplicationID());
     } 
   }
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java Thu Mar 24 07:52:35 2011
@@ -178,9 +178,6 @@ public class ApplicationsManagerImpl ext
     amTracker.addMaster(user, context, clientTokenStr);
     // TODO this should happen via dispatcher. should move it out to scheudler
     // negotiator.
-    scheduler.addApplication(applicationId, user, 
-        (context.queue == null? "default" : context.queue.toString()), 
-        context.priority);
     /* schedule */    
     LOG.info("Application with id " + applicationId.id + " submitted by user " + 
         user + " with " + context);
@@ -197,7 +194,6 @@ public class ApplicationsManagerImpl ext
   throws IOException {
     /* remove the applicaiton from the scheduler  for now. Later scheduler should
      * be a event handler of adding and cleaning up appications*/
-    scheduler.removeApplication(applicationId);
     amTracker.kill(applicationId);
   }
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java Thu Mar 24 07:52:35 2011
@@ -282,5 +282,10 @@ class SchedulerNegotiator extends Abstra
     public String getQueue() {
       throw notImplementedException;
     }
+
+    @Override
+    public int getFailedCount() {
+      throw notImplementedException;
+    }
   }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/ApplicationMasterEvents.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/ApplicationMasterEvents.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/ApplicationMasterEvents.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/ApplicationMasterEvents.java Thu Mar 24 07:52:35 2011
@@ -52,6 +52,7 @@ public class ApplicationMasterEvents {
     LAUNCH,
     LAUNCHED,
     FAILED,
+    FAILED_MAX_RETRIES,
     ALLOCATED,
     CLEANUP,
     FINISH, 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java Thu Mar 24 07:52:35 2011
@@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.crypto.SecretKey;
-import javax.xml.crypto.NodeSetData;
 
 import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.avro.ipc.Server;
@@ -43,14 +42,6 @@ import org.apache.hadoop.net.NetworkTopo
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.SecurityInfo;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
-import org.apache.hadoop.yarn.server.YarnServerConfig;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
-import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
-import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.HeartbeatResponse;
 import org.apache.hadoop.yarn.NodeID;
 import org.apache.hadoop.yarn.NodeStatus;
@@ -58,6 +49,14 @@ import org.apache.hadoop.yarn.Registrati
 import org.apache.hadoop.yarn.Resource;
 import org.apache.hadoop.yarn.ResourceTracker;
 import org.apache.hadoop.yarn.YarnClusterMetrics;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
+import org.apache.hadoop.yarn.server.YarnServerConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.service.AbstractService;
 
 /**
  * This class is responsible for the interaction with the NodeManagers.
@@ -65,7 +64,7 @@ import org.apache.hadoop.yarn.YarnCluste
  *`
  */
 public class RMResourceTrackerImpl extends AbstractService implements 
-ResourceTracker, RMResourceTracker, ResourceContext {
+ResourceTracker, ResourceContext {
   private static final Log LOG = LogFactory.getLog(RMResourceTrackerImpl.class);
   /* we dont garbage collect on nodes. A node can come back up again and re register,
    * so no use garbage collecting. Though admin can break the RM by bouncing 
@@ -99,11 +98,13 @@ ResourceTracker, RMResourceTracker, Reso
   private static final HeartbeatResponse reboot = new HeartbeatResponse();
   private long nmExpiryInterval;
 
-  public RMResourceTrackerImpl(ContainerTokenSecretManager containerTokenSecretManager) {
+  public RMResourceTrackerImpl(ContainerTokenSecretManager containerTokenSecretManager,
+      ResourceListener listener) {
     super(RMResourceTrackerImpl.class.getName());
     reboot.reboot = true;
     this.containerTokenSecretManager = containerTokenSecretManager;
     this.heartbeatThread = new HeartBeatThread();
+    this.resourceListener = listener;
   }
 
   @Override
@@ -253,19 +254,6 @@ ResourceTracker, RMResourceTracker, Reso
   }
 
   @Override
-  public synchronized void register(ResourceListener listener) {
-    //for now there is only one resource listener, so we dont
-    //really add it to a list.
-    this.resourceListener = listener;
-  }
-
-  @Override
-  public synchronized void unregister(ResourceListener listener) {
-    //TODO make the listener so that it dumps to a void listener
-    //rather than nullifying it.
-  }
-
-  @Override
   public List<NodeInfo> getAllNodeInfo() {
     List<NodeInfo> infoList = new ArrayList<NodeInfo>();
     synchronized (nodeManagers) {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java Thu Mar 24 07:52:35 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.List;
 
+import org.apache.hadoop.yarn.YarnClusterMetrics;
+
 /**
  * The read-only interface for cluster resource
  */
@@ -29,4 +31,10 @@ public interface ResourceContext {
    * @return a list of node info
    */
   List<NodeInfo> getAllNodeInfo();
+  
+  /**
+   * Get cluster metrics from the resource tracker.
+   * @return the cluster metrics for the cluster.
+   */
+  YarnClusterMetrics getClusterMetrics();
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java Thu Mar 24 07:52:35 2011
@@ -32,7 +32,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
 import org.apache.hadoop.yarn.ApplicationID;
 import org.apache.hadoop.yarn.Container;
 import org.apache.hadoop.yarn.ContainerID;

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeResponse.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeResponse.java?rev=1084866&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeResponse.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeResponse.java Thu Mar 24 07:52:35 2011
@@ -0,0 +1,32 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+
+/**
+ * The class that encapsulates response from clusterinfo for 
+ * updates from the node managers.
+ */
+public class NodeResponse {
+  private final List<Container> completed;
+  private final List<Container> toCleanUp;
+  private final List<ApplicationID> finishedApplications;
+  
+  public NodeResponse(List<ApplicationID> finishedApplications,
+      List<Container> completed, List<Container> toKill) {
+    this.finishedApplications = finishedApplications;
+    this.completed = completed;
+    this.toCleanUp = toKill;
+  }
+  public List<ApplicationID> getFinishedApplications() {
+    return this.finishedApplications;
+  }
+  public List<Container> getCompletedContainers() {
+    return this.completed;
+  }
+  public List<Container> getContainersToCleanUp() {
+    return this.toCleanUp;
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java Thu Mar 24 07:52:35 2011
@@ -25,7 +25,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
 import org.apache.hadoop.yarn.Container;
 import org.apache.hadoop.yarn.NodeID;
 import org.apache.hadoop.yarn.Resource;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Thu Mar 24 07:52:35 2011
@@ -23,6 +23,9 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 
 /**
@@ -32,7 +35,8 @@ import org.apache.hadoop.yarn.server.sec
  */
 @LimitedPrivate("yarn")
 @Evolving
-public interface ResourceScheduler extends ResourceListener, YarnScheduler {
+public interface ResourceScheduler extends ResourceListener, YarnScheduler, 
+  EventHandler<ASMEvent<ApplicationTrackerEventType>> {
   /**
    * Re-initialize the <code>ResourceScheduler</code>.
    * @param conf configuration

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Thu Mar 24 07:52:35 2011
@@ -23,7 +23,6 @@ import java.util.List;
 
 import org.apache.hadoop.yarn.ApplicationID;
 import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.Priority;
 import org.apache.hadoop.yarn.ResourceRequest;
 
 /**
@@ -43,21 +42,4 @@ public interface YarnScheduler {
   List<Container> allocate(ApplicationID applicationId,
       List<ResourceRequest> ask, List<Container> release)
       throws IOException;
-  /**
-   * A new application has been submitted to the ResourceManager
-   * @param applicationId application which has been submitted
-   * @param user application user
-   * @param queue queue to which the applications is being submitted
-   * @param priority application priority
-   */
-  public void addApplication(ApplicationID applicationId, String user, 
-      String queue, Priority priority) 
-  throws IOException;
-  
-  /**
-   * A submitted application has completed.
-   * @param applicationId completed application
-   */
-  public void removeApplication(ApplicationID applicationId)
-  throws IOException;
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Thu Mar 24 07:52:35 2011
@@ -1,26 +1,27 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -34,19 +35,20 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTrackerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
-import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.ApplicationID;
 import org.apache.hadoop.yarn.Container;
 import org.apache.hadoop.yarn.NodeID;
 import org.apache.hadoop.yarn.Priority;
 import org.apache.hadoop.yarn.Resource;
 import org.apache.hadoop.yarn.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 
 @LimitedPrivate("yarn")
 @Evolving
@@ -54,9 +56,9 @@ public class CapacityScheduler 
 implements ResourceScheduler, CapacitySchedulerContext {
 
   private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
-  
+
   private Queue root;
-  
+
   private final static List<Container> EMPTY_CONTAINER_LIST = 
     new ArrayList<Container>();
 
@@ -68,39 +70,31 @@ implements ResourceScheduler, CapacitySc
       } else if (q1.getUtilization() > q2.getUtilization()) {
         return 1;
       }
-      
+
       return q1.getQueuePath().compareTo(q2.getQueuePath());
     }
   };
 
   private final Comparator<Application> applicationComparator = 
     new Comparator<Application>() {
-      @Override
-      public int compare(Application a1, Application a2) {
-        return a1.getApplicationId().id - a2.getApplicationId().id;
-      }
+    @Override
+    public int compare(Application a1, Application a2) {
+      return a1.getApplicationId().id - a2.getApplicationId().id;
+    }
   };
-  
+
   private CapacitySchedulerConfiguration conf;
   private ContainerTokenSecretManager containerTokenSecretManager;
-  
+
   private Map<String, Queue> queues = new ConcurrentHashMap<String, Queue>();
-  
-  private final ClusterTracker clusterTracker;
-  
+
+
   private Resource minimumAllocation;
-  
+
   private Map<ApplicationID, Application> applications = 
     new TreeMap<ApplicationID, Application>(
         new org.apache.hadoop.yarn.server.resourcemanager.resource.ApplicationID.Comparator());
 
-  public CapacityScheduler() {
-    this.clusterTracker = createClusterTracker();
-  }
-  
-  protected ClusterTracker createClusterTracker() {
-    return new ClusterTrackerImpl();
-  }
 
   public Queue getRootQueue() {
     return root;
@@ -110,7 +104,7 @@ implements ResourceScheduler, CapacitySc
   public CapacitySchedulerConfiguration getConfiguration() {
     return conf;
   }
-  
+
   @Override
   public ContainerTokenSecretManager getContainerTokenSecretManager() {
     return containerTokenSecretManager;
@@ -127,22 +121,22 @@ implements ResourceScheduler, CapacitySc
     this.conf = new CapacitySchedulerConfiguration(conf);
     this.minimumAllocation = this.conf.getMinimumAllocation();
     this.containerTokenSecretManager = containerTokenSecretManager;
-    
+
     initializeQueues(this.conf);
   }
 
   @Private
   public static final String ROOT = "root";
-  
+
   @Private
   public static final String ROOT_QUEUE = 
     CapacitySchedulerConfiguration.PREFIX + ROOT;
-  
+
   private void initializeQueues(CapacitySchedulerConfiguration conf) {
     root = parseQueue(conf, null, ROOT);
     LOG.info("Initialized root queue " + root);
   }
-  
+
   private Queue parseQueue(CapacitySchedulerConfiguration conf, 
       Queue parent, String queueName) {
     Queue queue;
@@ -162,29 +156,37 @@ implements ResourceScheduler, CapacitySc
               parentQueue, 
               childQueueName);
         childQueues.add(childQueue);
-        
+
         queues.put(childQueueName, childQueue);
       }
       parentQueue.setChildQueues(childQueues);
-      
+
       queue = parentQueue;
     }
-    
+
     LOG.info("Initialized queue: " + queue);
     return queue;
   }
   
-  @Override
+  /**
+   * Add an application to the capacity scheduler. This application needs to be 
+   * tracked. 
+   * @param applicationId the application id of this application
+   * @param user the user who owns the application
+   * @param queueName the queue which the application belongs to
+   * @param priority the priority of the application
+   * @throws IOException
+   */
   public void addApplication(ApplicationID applicationId, 
       String user, String queueName, Priority priority)
   throws IOException {
     Queue queue = queues.get(queueName);
-    
+
     if (queue == null) {
       throw new IOException("Application " + applicationId + 
           " submitted by user " + user + " to unknown queue: " + queueName);
     }
-    
+
     if (!(queue instanceof LeafQueue)) {
       throw new IOException("Application " + applicationId + 
           " submitted by user " + user + " to non-leaf queue: " + queueName);
@@ -196,7 +198,7 @@ implements ResourceScheduler, CapacitySc
     } catch (AccessControlException ace) {
       throw new IOException(ace);
     }
-    
+
     applications.put(applicationId, application);
 
     LOG.info("Application Submission: " + applicationId.id + 
@@ -205,14 +207,19 @@ implements ResourceScheduler, CapacitySc
         ", currently active: " + applications.size());
   }
 
-  @Override
+  /**
+   * Remove an application. Releases the resources of the application and 
+   * then makes sure its removed from data structures of the scheduler.
+   * @param applicationId the applicationId of the application
+   * @throws IOException
+   */
   public void removeApplication(ApplicationID applicationId)
-      throws IOException {
+  throws IOException {
     Application application = getApplication(applicationId);
-    
+
     if (application == null) {
-//      throw new IOException("Unknown application " + applicationId + 
-//          " has completed!");
+      //      throw new IOException("Unknown application " + applicationId + 
+      //          " has completed!");
       LOG.info("Unknown application " + applicationId + " has completed!");
       return;
     }
@@ -221,30 +228,19 @@ implements ResourceScheduler, CapacitySc
     Queue queue = queues.get(application.getQueue().getQueueName());
     LOG.info("DEBUG --- removeApplication - appId: " + applicationId + " queue: " + queue);
     queue.finishApplication(application, queue.getQueueName());
-    
+
     // Release containers and update queue capacities
     processReleasedContainers(application, application.getCurrentContainers());
-    
+
     // Inform all NodeManagers about completion of application
-    clusterTracker.finishedApplication(applicationId, 
+    finishedApplication(applicationId, 
         application.getAllNodesForApplication());
-    
+
     // Remove from our data-structure
     applications.remove(applicationId);
   }
 
   @Override
-  public NodeInfo addNode(NodeID nodeId,String hostName,
-      Node node, Resource capability) {
-    return clusterTracker.addNode(nodeId, hostName, node, capability);
-  }
-
-  @Override
-  public void removeNode(NodeInfo node) {
-    clusterTracker.removeNode(node);
-  }
-
-  @Override
   public List<Container> allocate(ApplicationID applicationId,
       List<ResourceRequest> ask, List<Container> release)
       throws IOException {
@@ -264,10 +260,10 @@ implements ResourceScheduler, CapacitySc
 
     // Update application requests
     application.updateResourceRequests(ask);
-    
+
     // Release ununsed containers and update queue capacities
     processReleasedContainers(application, release);
-    
+
     LOG.info("DEBUG --- allocate: post-update");
     application.showRequests();
 
@@ -285,29 +281,29 @@ implements ResourceScheduler, CapacitySc
       normalizeRequest(ask);
     }
   }
-  
+
   private void normalizeRequest(ResourceRequest ask) {
     int memory = ask.capability.memory;
     int minMemory = minimumAllocation.memory;
     ask.capability.memory =
       minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0));
   }
-  
+
 
   @Override
   public synchronized NodeResponse nodeUpdate(NodeInfo node, 
       Map<CharSequence,List<Container>> containers ) {
-    
+
     LOG.info("nodeUpdate: " + node);
-    
-    NodeResponse nodeResponse = clusterTracker.nodeUpdate(node, containers);
+
+    NodeResponse nodeResponse = nodeUpdateInternal(node, containers);
 
     // Completed containers
     processCompletedContainers(nodeResponse.getCompletedContainers());
-    
+    NodeManager nm = nodes.get(node.getHostName());
     // Assign new containers
-    root.assignContainers(clusterTracker, node);
-    
+    root.assignContainers(clusterResource, nm);
+
     return nodeResponse;
   }
 
@@ -322,11 +318,11 @@ implements ResourceScheduler, CapacitySc
 
         // Inform the queue
         LeafQueue queue = (LeafQueue)application.getQueue();
-        queue.completedContainer(clusterTracker, container, application);
+        queue.completedContainer(clusterResource, container, application);
       }
     }
   }
-   
+
   private synchronized void processReleasedContainers(Application application,
       List<Container> releasedContainers) {
     // Inform the application
@@ -335,7 +331,7 @@ implements ResourceScheduler, CapacitySc
     // Inform clusterTracker
     List<Container> unusedContainers = new ArrayList<Container>();
     for (Container container : releasedContainers) {
-      if (clusterTracker.releaseContainer(
+      if (releaseContainer(
           application.getApplicationId(), 
           container)) {
         unusedContainers.add(container);
@@ -345,9 +341,95 @@ implements ResourceScheduler, CapacitySc
     // Update queue capacities
     processCompletedContainers(unusedContainers);
   }
-  
+
   private synchronized Application getApplication(ApplicationID applicationId) {
     return applications.get(applicationId);
   }
 
+  @Override
+  public synchronized void handle(ASMEvent<ApplicationTrackerEventType> event) {
+    switch(event.getType()) {
+    case ADD:
+      try {
+        addApplication(event.getAppContext().getApplicationID(), 
+            event.getAppContext().getUser(), event.getAppContext().getQueue(),
+            event.getAppContext().getSubmissionContext().priority);
+      } catch(IOException ie) {
+        LOG.error("Error in adding an application to the scheduler", ie);
+        //TODO do proper error handling to shutdown the Resource Manager is we 
+        // are not able to handle this.
+      }
+      break;
+    case REMOVE:
+      try {
+        removeApplication(event.getAppContext().getApplicationID());
+      } catch(IOException ie) {
+        LOG.error("Error in removing application", ie);
+        //TODO have to be shutdown the RM in case of this.
+        // do a graceful shutdown.
+      }
+      break;
+    }
+  }
+  
+  private Map<String, NodeManager> nodes = new HashMap<String, NodeManager>();
+  private Resource clusterResource = new Resource();
+  
+ 
+  public synchronized Resource getClusterResource() {
+    return clusterResource;
+  }
+
+  @Override
+  public synchronized void removeNode(NodeInfo nodeInfo) {
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+        clusterResource, nodeInfo.getTotalCapability());
+    nodes.remove(nodeInfo.getHostName());
+  }
+  
+  public synchronized boolean isTracked(NodeInfo nodeInfo) {
+    NodeManager node = nodes.get(nodeInfo.getHostName());
+    return (node == null? false: true);
+  }
+ 
+  @Override
+  public synchronized NodeInfo addNode(NodeID nodeId, 
+      String hostName, Node node, Resource capability) {
+    NodeManager nodeManager = new NodeManager(nodeId, hostName, node, capability);
+    nodes.put(nodeManager.getHostName(), nodeManager);
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+        clusterResource, nodeManager.getTotalCapability());
+    return nodeManager;
+  }
+
+  public synchronized boolean releaseContainer(ApplicationID applicationId, 
+      Container container) {
+    // Reap containers
+    LOG.info("Application " + applicationId + " released container " + container);
+    NodeManager nodeManager = nodes.get(container.hostName.toString());
+    return nodeManager.releaseContainer(container);
+  }
+  
+  public synchronized NodeResponse nodeUpdateInternal(NodeInfo nodeInfo, 
+      Map<CharSequence,List<Container>> containers) {
+    NodeManager node = nodes.get(nodeInfo.getHostName());
+    LOG.debug("nodeUpdate: node=" + nodeInfo.getHostName() + 
+        " available=" + nodeInfo.getAvailableResource().memory);
+    return node.statusUpdate(containers);
+    
+  }
+
+  public synchronized void addAllocatedContainers(NodeInfo nodeInfo, 
+      ApplicationID applicationId, List<Container> containers) {
+    NodeManager node = nodes.get(nodeInfo.getHostName());
+    node.allocateContainer(applicationId, containers);
+  }
+
+  public synchronized void finishedApplication(ApplicationID applicationId,
+      List<NodeInfo> nodesToNotify) {
+    for (NodeInfo node: nodesToNotify) {
+      NodeManager nodeManager = nodes.get(node.getHostName());
+      nodeManager.notifyFinishedApplication(applicationId);
+    }
+  }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Thu Mar 24 07:52:35 2011
@@ -33,18 +33,17 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerToken;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceRequest;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.ContainerToken;
-import org.apache.hadoop.yarn.Priority;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.ResourceRequest;
 
 @Private
 @Unstable
@@ -277,7 +276,7 @@ public class LeafQueue implements Queue 
   
   @Override
   public synchronized Resource 
-  assignContainers(ClusterTracker cluster, NodeInfo node) {
+  assignContainers(Resource clusterResource, NodeManager node) {
   
     LOG.info("DEBUG --- assignContainers:" +
         " node=" + node.getHostName() + 
@@ -303,19 +302,19 @@ public class LeafQueue implements Queue 
           if (required != null && required.numContainers > 0) {
             
             // Maximum Capacity of the queue
-            if (!assignToQueue(cluster, required.capability)) {
+            if (!assignToQueue(clusterResource, required.capability)) {
               return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
             }
             
             // User limits
-            if (!assignToUser(application.getUser(), cluster, required.capability)) {
+            if (!assignToUser(application.getUser(), clusterResource, required.capability)) {
               return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
             }
             
           }
           
           Resource assigned = 
-            assignContainersOnNode(cluster, node, application, priority);
+            assignContainersOnNode(clusterResource, node, application, priority);
   
           if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
                 assigned, 
@@ -324,7 +323,7 @@ public class LeafQueue implements Queue 
               application.getResourceRequest(priority, NodeManager.ANY).capability;
             
             // Book-keeping
-            allocateResource(cluster.getClusterResource(), 
+            allocateResource(clusterResource, 
                 application.getUser(), assignedResource);
             
             // Done
@@ -343,11 +342,11 @@ public class LeafQueue implements Queue 
     return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
   }
 
-  private synchronized boolean assignToQueue(ClusterTracker cluster, 
+  private synchronized boolean assignToQueue(Resource clusterResource, 
       Resource required) {
     float newUtilization = 
       (float)(usedResources.memory + required.memory) / 
-        (cluster.getClusterResource().memory * absoluteCapacity);
+        (clusterResource.memory * absoluteCapacity);
     if (newUtilization > absoluteMaxCapacity) {
       LOG.info(getQueueName() + 
           " current-capacity (" + getUtilization() + ") +" +
@@ -358,7 +357,7 @@ public class LeafQueue implements Queue 
     return true;
   }
   
-  private synchronized boolean assignToUser(String userName, ClusterTracker cluster,
+  private synchronized boolean assignToUser(String userName, Resource clusterResource,
       Resource required) {
     // What is our current capacity? 
     // * It is equal to the max(required, queue-capacity) if
@@ -370,7 +369,7 @@ public class LeafQueue implements Queue 
     // Allow progress for queues with miniscule capacity
     final int queueCapacity = 
       Math.max(
-          divideAndCeil((int)(absoluteCapacity * cluster.getClusterResource().memory), 
+          divideAndCeil((int)(absoluteCapacity * clusterResource.memory), 
               minimumAllocation.memory), 
           required.memory);
     
@@ -425,14 +424,14 @@ public class LeafQueue implements Queue 
     return (offSwitchRequest.numContainers > 0);
   }
 
-  Resource assignContainersOnNode(ClusterTracker cluster, NodeInfo node, 
+  Resource assignContainersOnNode(Resource clusterResource, NodeManager node, 
       Application application, Priority priority) {
 
     Resource assigned = 
       org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
 
     // Data-local
-    assigned = assignNodeLocalContainers(cluster, node, application, priority); 
+    assigned = assignNodeLocalContainers(clusterResource, node, application, priority); 
     if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
           assigned, 
           org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
@@ -440,7 +439,7 @@ public class LeafQueue implements Queue 
     }
 
     // Rack-local
-    assigned = assignRackLocalContainers(cluster, node, application, priority);
+    assigned = assignRackLocalContainers(clusterResource, node, application, priority);
     if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
         assigned, 
         org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
@@ -448,16 +447,16 @@ public class LeafQueue implements Queue 
   }
     
     // Off-switch
-    return assignOffSwitchContainers(cluster, node, application, priority);
+    return assignOffSwitchContainers(clusterResource, node, application, priority);
   }
 
-  Resource assignNodeLocalContainers(ClusterTracker cluster, NodeInfo node, 
+  Resource assignNodeLocalContainers(Resource clusterResource, NodeManager node, 
       Application application, Priority priority) {
     ResourceRequest request = 
       application.getResourceRequest(priority, node.getHostName());
     if (request != null) {
       if (canAssign(application, priority, node, NodeType.DATA_LOCAL)) {
-        return assignContainer(cluster, node, application, priority, request, 
+        return assignContainer(clusterResource, node, application, priority, request, 
             NodeType.DATA_LOCAL);
       }
     }
@@ -465,13 +464,13 @@ public class LeafQueue implements Queue 
     return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
   }
 
-  Resource assignRackLocalContainers(ClusterTracker cluster, NodeInfo node, 
+  Resource assignRackLocalContainers(Resource clusterResource, NodeManager node, 
       Application application, Priority priority) {
     ResourceRequest request = 
       application.getResourceRequest(priority, node.getRackName());
     if (request != null) {
       if (canAssign(application, priority, node, NodeType.RACK_LOCAL)) {
-        return assignContainer(cluster, node, application, priority, request, 
+        return assignContainer(clusterResource, node, application, priority, request, 
             NodeType.RACK_LOCAL);
       }
     }
@@ -479,13 +478,13 @@ public class LeafQueue implements Queue 
     return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
   }
 
-  Resource assignOffSwitchContainers(ClusterTracker cluster, NodeInfo node, 
+  Resource assignOffSwitchContainers(Resource clusterResource, NodeManager node, 
       Application application, Priority priority) {
     ResourceRequest request = 
       application.getResourceRequest(priority, NodeManager.ANY);
     if (request != null) {
       if (canAssign(application, priority, node, NodeType.OFF_SWITCH)) {
-        return assignContainer(cluster, node, application, priority, request, 
+        return assignContainer(clusterResource, node, application, priority, request, 
             NodeType.OFF_SWITCH);
       }
     }
@@ -529,7 +528,7 @@ public class LeafQueue implements Queue 
     return false;
   }
   
-  private Resource assignContainer(ClusterTracker cluster, NodeInfo node, 
+  private Resource assignContainer(Resource clusterResource, NodeManager node, 
       Application application, 
       Priority priority, ResourceRequest request, NodeType type) {
     LOG.info("DEBUG --- assignContainers:" +
@@ -577,16 +576,14 @@ public class LeafQueue implements Queue 
       // Allocate container to the application
       application.allocate(type, node, priority, request, containers);
       
-      // Update resource usage on the node
-      cluster.addAllocatedContainers(node, application.getApplicationId(), 
-          containers);
+      node.allocateContainer(application.getApplicationId(), containers);
       
       LOG.info("allocatedContainer" +
           " container=" + container + 
           " queue=" + this.toString() + 
           " util=" + getUtilization() + 
           " used=" + usedResources + 
-          " cluster=" + cluster.getClusterResource());
+          " cluster=" + clusterResource);
 
       return container.resource;
     }
@@ -595,7 +592,7 @@ public class LeafQueue implements Queue 
   }
 
   @Override
-  public void completedContainer(ClusterTracker cluster, 
+  public void completedContainer(Resource clusterResource, 
       Container container, Application application) {
     if (application != null) {
       // Careful! Locking order is important!
@@ -604,7 +601,7 @@ public class LeafQueue implements Queue 
         application.completedContainer(container);
         
         // Book-keeping
-        releaseResource(cluster.getClusterResource(), 
+        releaseResource(clusterResource, 
             application.getUser(), container.resource);
         
         LOG.info("completedContainer" +
@@ -612,11 +609,11 @@ public class LeafQueue implements Queue 
         		" queue=" + this + 
             " util=" + getUtilization() + 
             " used=" + usedResources + 
-            " cluster=" + cluster.getClusterResource());
+            " cluster=" + clusterResource);
       }
       
       // Inform the parent queue
-      parent.completedContainer(cluster, container, application);
+      parent.completedContainer(clusterResource, container, application);
     }
   }
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Thu Mar 24 07:52:35 2011
@@ -31,12 +31,12 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
 import org.apache.hadoop.yarn.Container;
 import org.apache.hadoop.yarn.Priority;
 import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 
 @Private
 @Evolving
@@ -251,8 +251,8 @@ public class ParentQueue implements Queu
   }
 
   @Override
-  public synchronized Resource assignContainers(ClusterTracker cluster, 
-      NodeInfo node) {
+  public synchronized Resource assignContainers(
+      Resource clusterResource, NodeManager node) {
     Resource assigned = 
       org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
 
@@ -269,14 +269,14 @@ public class ParentQueue implements Queu
       }
       
       // Schedule
-      Resource assignedToChild = assignContainersToChildQueues(cluster, node);
+      Resource assignedToChild = assignContainersToChildQueues(clusterResource, node);
 
       // Done if no child-queue assigned anything
       if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
           assignedToChild, 
           org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
         // Track resource utilization for the parent-queue
-        allocateResource(cluster.getClusterResource(), assignedToChild);
+        allocateResource(clusterResource, assignedToChild);
         
         // Track resource utilization in this pass of the scheduler
         org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
@@ -286,7 +286,7 @@ public class ParentQueue implements Queu
             " queue=" + getQueueName() + 
             " util=" + getUtilization() + 
             " used=" + usedResources + 
-            " cluster=" + cluster.getClusterResource());
+            " cluster=" + clusterResource);
 
       } else {
         break;
@@ -317,8 +317,8 @@ public class ParentQueue implements Queu
         minimumAllocation);
   }
   
-  synchronized Resource assignContainersToChildQueues(ClusterTracker cluster, 
-      NodeInfo node) {
+  synchronized Resource assignContainersToChildQueues(Resource cluster, 
+      NodeManager node) {
     Resource assigned = 
       org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
     
@@ -362,24 +362,24 @@ public class ParentQueue implements Queu
   }
   
   @Override
-  public void completedContainer(ClusterTracker cluster,
+  public void completedContainer(Resource clusterResource,
       Container container, Application application) {
     if (application != null) {
       // Careful! Locking order is important!
       // Book keeping
       synchronized (this) {
-        releaseResource(cluster.getClusterResource(), container.resource);
+        releaseResource(clusterResource, container.resource);
 
         LOG.info("completedContainer" +
             " queue=" + getQueueName() + 
             " util=" + getUtilization() + 
             " used=" + usedResources + 
-            " cluster=" + cluster.getClusterResource());
+            " cluster=" + clusterResource);
       }
 
       // Inform the parent
       if (parent != null) {
-        parent.completedContainer(cluster, container, application);
+        parent.completedContainer(clusterResource, container, application);
       }    
     }
   }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1084866&r1=1084865&r2=1084866&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Thu Mar 24 07:52:35 2011
@@ -23,12 +23,12 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
 import org.apache.hadoop.yarn.Container;
 import org.apache.hadoop.yarn.Priority;
 import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 
 /**
  * Queue represents a node in the tree of 
@@ -141,19 +141,19 @@ extends org.apache.hadoop.yarn.server.re
   
   /**
    * Assign containers to applications in the queue or it's children (if any).
-   * @param cluster cluster resources
+   * @param clusterResource the resource of the cluster.
    * @param node node on which resources are available
    * @return
    */
-  public Resource assignContainers(ClusterTracker cluster, NodeInfo node);
+  public Resource assignContainers(Resource clusterResource, NodeManager node);
   
   /**
    * A container assigned to the queue has completed.
-   * @param cluster cluster resources
+   * @param clusterResource the resource of the cluster
    * @param container completed container
    * @param application application to which the container was assigned
    */
-  public void completedContainer(ClusterTracker cluster, 
+  public void completedContainer(Resource clusterResource,
       Container container, Application application);
 
   /**