You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2014/01/15 07:06:35 UTC

svn commit: r1558303 [3/5] - in /hadoop/common/branches/HDFS-5535/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/ma...

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Wed Jan 15 06:06:31 2014
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -120,10 +121,10 @@ import com.google.common.annotations.Vis
 @LimitedPrivate("yarn")
 @Unstable
 @SuppressWarnings("unchecked")
-public class FairScheduler implements ResourceScheduler {
+public class FairScheduler extends AbstractYarnScheduler implements
+    ResourceScheduler {
   private boolean initialized;
   private FairSchedulerConfiguration conf;
-  private RMContext rmContext;
   private Resource minimumAllocation;
   private Resource maximumAllocation;
   private Resource incrAllocation;
@@ -157,17 +158,6 @@ public class FairScheduler implements Re
   // Time we last ran preemptTasksIfNecessary
   private long lastPreemptCheckTime;
 
-  // This stores per-application scheduling information,
-  @VisibleForTesting
-  protected Map<ApplicationId, SchedulerApplication> applications =
-      new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
-
-  // This stores per-application-attempt scheduling information, indexed by
-  // attempt ID's for fast lookup.
-  @VisibleForTesting
-  protected Map<ApplicationAttemptId, FSSchedulerApp> appAttempts = 
-      new ConcurrentHashMap<ApplicationAttemptId, FSSchedulerApp>();
-
   // Nodes in the cluster, indexed by NodeId
   private Map<NodeId, FSSchedulerNode> nodes = 
       new ConcurrentHashMap<NodeId, FSSchedulerNode>();
@@ -262,10 +252,21 @@ public class FairScheduler implements Re
     return queueMgr;
   }
 
-  private RMContainer getRMContainer(ContainerId containerId) {
-    FSSchedulerApp application = 
-        appAttempts.get(containerId.getApplicationAttemptId());
-    return (application == null) ? null : application.getRMContainer(containerId);
+  @Override
+  public RMContainer getRMContainer(ContainerId containerId) {
+    FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
+    return (attempt == null) ? null : attempt.getRMContainer(containerId);
+  }
+
+  private FSSchedulerApp getCurrentAttemptForContainer(
+      ContainerId containerId) {
+    SchedulerApplication app =
+        applications.get(containerId.getApplicationAttemptId()
+          .getApplicationId());
+    if (app != null) {
+      return (FSSchedulerApp) app.getCurrentAppAttempt();
+    }
+    return null;
   }
 
   /**
@@ -638,9 +639,11 @@ public class FairScheduler implements Re
     SchedulerApplication application =
         new SchedulerApplication(queue, user);
     applications.put(applicationId, application);
+    queue.getMetrics().submitApp(user);
 
     LOG.info("Accepted application " + applicationId + " from user: " + user
-        + ", in queue: " + queueName);
+        + ", in queue: " + queueName + ", currently num of applications: "
+        + applications.size());
     rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
   }
@@ -649,31 +652,35 @@ public class FairScheduler implements Re
    * Add a new application attempt to the scheduler.
    */
   protected synchronized void addApplicationAttempt(
-      ApplicationAttemptId applicationAttemptId) {
+      ApplicationAttemptId applicationAttemptId,
+      boolean transferStateFromPreviousAttempt) {
     SchedulerApplication application =
         applications.get(applicationAttemptId.getApplicationId());
     String user = application.getUser();
     FSLeafQueue queue = (FSLeafQueue) application.getQueue();
 
-    FSSchedulerApp schedulerApp =
+    FSSchedulerApp attempt =
         new FSSchedulerApp(applicationAttemptId, user,
             queue, new ActiveUsersManager(getRootQueueMetrics()),
             rmContext);
+    if (transferStateFromPreviousAttempt) {
+      attempt.transferStateFromPreviousAttempt(application
+        .getCurrentAppAttempt());
+    }
+    application.setCurrentAppAttempt(attempt);
 
     boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
-    queue.addApp(schedulerApp, runnable);
+    queue.addApp(attempt, runnable);
     if (runnable) {
-      maxRunningEnforcer.trackRunnableApp(schedulerApp);
+      maxRunningEnforcer.trackRunnableApp(attempt);
     } else {
-      maxRunningEnforcer.trackNonRunnableApp(schedulerApp);
+      maxRunningEnforcer.trackNonRunnableApp(attempt);
     }
     
-    queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
-    appAttempts.put(applicationAttemptId, schedulerApp);
+    queue.getMetrics().submitAppAttempt(user);
 
     LOG.info("Added Application Attempt " + applicationAttemptId
-        + " to scheduler from user: " + user + ", currently active: "
-        + appAttempts.size());
+        + " to scheduler from user: " + user);
     rmContext.getDispatcher().getEventHandler().handle(
         new RMAppAttemptEvent(applicationAttemptId,
             RMAppAttemptEventType.ATTEMPT_ADDED));
@@ -704,24 +711,38 @@ public class FairScheduler implements Re
 
   private synchronized void removeApplication(ApplicationId applicationId,
       RMAppState finalState) {
+    SchedulerApplication application = applications.get(applicationId);
+    if (application == null){
+      LOG.warn("Couldn't find application " + applicationId);
+      return;
+    }
+    application.stop(finalState);
     applications.remove(applicationId);
   }
 
   private synchronized void removeApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
-      RMAppAttemptState rmAppAttemptFinalState) {
+      RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
     LOG.info("Application " + applicationAttemptId + " is done." +
         " finalState=" + rmAppAttemptFinalState);
+    SchedulerApplication application =
+        applications.get(applicationAttemptId.getApplicationId());
+    FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
 
-    FSSchedulerApp application = appAttempts.get(applicationAttemptId);
-
-    if (application == null) {
+    if (attempt == null || application == null) {
       LOG.info("Unknown application " + applicationAttemptId + " has completed!");
       return;
     }
 
     // Release all the running containers
-    for (RMContainer rmContainer : application.getLiveContainers()) {
+    for (RMContainer rmContainer : attempt.getLiveContainers()) {
+      if (keepContainers
+          && rmContainer.getState().equals(RMContainerState.RUNNING)) {
+        // do not kill the running container in the case of work-preserving AM
+        // restart.
+        LOG.info("Skip killing " + rmContainer.getContainerId());
+        continue;
+      }
       completedContainer(rmContainer,
           SchedulerUtils.createAbnormalContainerStatus(
               rmContainer.getContainerId(),
@@ -730,30 +751,26 @@ public class FairScheduler implements Re
     }
 
     // Release all reserved containers
-    for (RMContainer rmContainer : application.getReservedContainers()) {
+    for (RMContainer rmContainer : attempt.getReservedContainers()) {
       completedContainer(rmContainer,
           SchedulerUtils.createAbnormalContainerStatus(
               rmContainer.getContainerId(),
               "Application Complete"),
-          RMContainerEventType.KILL);
+              RMContainerEventType.KILL);
     }
-
     // Clean up pending requests, metrics etc.
-    application.stop(rmAppAttemptFinalState);
+    attempt.stop(rmAppAttemptFinalState);
 
     // Inform the queue
-    FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
+    FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue()
         .getQueueName(), false);
-    boolean wasRunnable = queue.removeApp(application);
+    boolean wasRunnable = queue.removeApp(attempt);
 
     if (wasRunnable) {
-      maxRunningEnforcer.updateRunnabilityOnAppRemoval(application);
+      maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
     } else {
-      maxRunningEnforcer.untrackNonRunnableApp(application);
+      maxRunningEnforcer.untrackNonRunnableApp(attempt);
     }
-    
-    // Remove from our data-structure
-    appAttempts.remove(applicationAttemptId);
   }
 
   /**
@@ -769,11 +786,13 @@ public class FairScheduler implements Re
     Container container = rmContainer.getContainer();
 
     // Get the application for the finished container
-    ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
-    FSSchedulerApp application = appAttempts.get(applicationAttemptId);
+    FSSchedulerApp application =
+        getCurrentAttemptForContainer(container.getId());
+    ApplicationId appId =
+        container.getId().getApplicationAttemptId().getApplicationId();
     if (application == null) {
       LOG.info("Container " + container + " of" +
-          " unknown application " + applicationAttemptId +
+          " unknown application attempt " + appId +
           " completed with event " + event);
       return;
     }
@@ -790,10 +809,9 @@ public class FairScheduler implements Re
       updateRootQueueMetrics();
     }
 
-    LOG.info("Application " + applicationAttemptId +
-        " released container " + container.getId() +
-        " on node: " + node +
-        " with event: " + event);
+    LOG.info("Application attempt " + application.getApplicationAttemptId()
+        + " released container " + container.getId() + " on node: " + node
+        + " with event: " + event);
   }
 
   private synchronized void addNode(RMNode node) {
@@ -844,7 +862,7 @@ public class FairScheduler implements Re
       List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
 
     // Make sure this application exists
-    FSSchedulerApp application = appAttempts.get(appAttemptId);
+    FSSchedulerApp application = getSchedulerApp(appAttemptId);
     if (application == null) {
       LOG.info("Calling allocate on removed " +
           "or non existant application " + appAttemptId);
@@ -914,12 +932,11 @@ public class FairScheduler implements Re
    */
   private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
     // Get the application for the finished container
-    ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
-    FSSchedulerApp application = appAttempts.get(applicationAttemptId);
+    FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
     if (application == null) {
-      LOG.info("Unknown application: " + applicationAttemptId +
-          " launched container " + containerId +
-          " on node: " + node);
+      LOG.info("Unknown application "
+          + containerId.getApplicationAttemptId().getApplicationId()
+          + " launched container " + containerId + " on node: " + node);
       return;
     }
 
@@ -1058,28 +1075,34 @@ public class FairScheduler implements Re
   }
   
   public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
-    return appAttempts.get(appAttemptId);
+    SchedulerApplication app =
+        applications.get(appAttemptId.getApplicationId());
+    if (app != null) {
+      return (FSSchedulerApp) app.getCurrentAppAttempt();
+    }
+    return null;
   }
   
   @Override
   public SchedulerAppReport getSchedulerAppInfo(
       ApplicationAttemptId appAttemptId) {
-    if (!appAttempts.containsKey(appAttemptId)) {
+    FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
+    if (attempt == null) {
       LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
       return null;
     }
-    return new SchedulerAppReport(appAttempts.get(appAttemptId));
+    return new SchedulerAppReport(attempt);
   }
   
   @Override
   public ApplicationResourceUsageReport getAppResourceUsageReport(
       ApplicationAttemptId appAttemptId) {
-    FSSchedulerApp app = appAttempts.get(appAttemptId);
-    if (app == null) {
+    FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
+    if (attempt == null) {
       LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
       return null;
     }
-    return app.getResourceUsageReport();
+    return attempt.getResourceUsageReport();
   }
   
   /**
@@ -1145,7 +1168,8 @@ public class FairScheduler implements Re
       }
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
-      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
       break;
     case APP_ATTEMPT_REMOVED:
       if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
@@ -1153,8 +1177,10 @@ public class FairScheduler implements Re
       }
       AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
           (AppAttemptRemovedSchedulerEvent) event;
-      removeApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
-        appAttemptRemovedEvent.getFinalAttemptState());
+      removeApplicationAttempt(
+          appAttemptRemovedEvent.getApplicationAttemptID(),
+          appAttemptRemovedEvent.getFinalAttemptState(),
+          appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
       break;
     case CONTAINER_EXPIRED:
       if (!(event instanceof ContainerExpiredSchedulerEvent)) {
@@ -1205,6 +1231,9 @@ public class FairScheduler implements Re
       
       rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
       this.rmContext = rmContext;
+      // This stores per-application scheduling information
+      this.applications =
+          new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
       this.eventLog = new FairSchedulerEventLog();
       eventLog.init(this.conf);
 
@@ -1327,5 +1356,4 @@ public class FairScheduler implements Re
     queue.collectSchedulerApplications(apps);
     return apps;
   }
-
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Wed Jan 15 06:06:31 2014
@@ -85,9 +85,7 @@ public class QueueManager {
    * could be referred to as just "parent1.queue2".
    */
   public FSLeafQueue getLeafQueue(String name, boolean create) {
-    if (!name.startsWith(ROOT_QUEUE + ".")) {
-      name = ROOT_QUEUE + "." + name;
-    }
+    name = ensureRootPrefix(name);
     synchronized (queues) {
       FSQueue queue = queues.get(name);
       if (queue == null && create) {
@@ -175,12 +173,106 @@ public class QueueManager {
   }
 
   /**
+   * Make way for the given leaf queue if possible, by removing incompatible
+   * queues with no apps in them. Incompatibility could be due to
+   * (1) leafToCreate being currently being a parent, or (2) an existing leaf queue in
+   * the ancestry of leafToCreate.
+   * 
+   * We will never remove the root queue or the default queue in this way.
+   *
+   * @return true if we can create leafToCreate or it already exists.
+   */
+  private boolean removeEmptyIncompatibleQueues(String leafToCreate) {
+    leafToCreate = ensureRootPrefix(leafToCreate);
+
+    // Ensure leafToCreate is not root and doesn't have the default queue in its
+    // ancestry.
+    if (leafToCreate.equals(ROOT_QUEUE) ||
+        leafToCreate.startsWith(
+            ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) {
+      return false;
+    }
+
+    FSQueue queue = queues.get(leafToCreate);
+    // Queue exists already.
+    if (queue != null) {
+      if (queue instanceof FSLeafQueue) {
+        // If it's an already existing leaf, we're ok.
+        return true;
+      } else {
+        // If it's an existing parent queue, remove it if it's empty.
+        return removeQueueIfEmpty(queue);
+      }
+    }
+
+    // Queue doesn't exist already. Check if the new queue would be created
+    // under an existing leaf queue. If so, try removing that leaf queue.
+    int sepIndex = leafToCreate.length();
+    sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
+    while (sepIndex != -1) {
+      String prefixString = leafToCreate.substring(0, sepIndex);
+      FSQueue prefixQueue = queues.get(prefixString);
+      if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) {
+        return removeQueueIfEmpty(prefixQueue);
+      }
+      sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
+    }
+    return true;
+  }
+
+  /**
+   * Remove the queue if it and its descendents are all empty.
+   * @param queue
+   * @return true if removed, false otherwise
+   */
+  private boolean removeQueueIfEmpty(FSQueue queue) {
+    if (isEmpty(queue)) {
+      removeQueue(queue);
+      return true;
+    }
+    return false;
+  }
+  
+  /**
+   * Remove a queue and all its descendents.
+   */
+  private void removeQueue(FSQueue queue) {
+    if (queue instanceof FSLeafQueue) {
+      leafQueues.remove(queue);
+    } else {
+      List<FSQueue> childQueues = queue.getChildQueues();
+      while (!childQueues.isEmpty()) {
+        removeQueue(childQueues.get(0));
+      }
+    }
+    queues.remove(queue.getName());
+    queue.getParent().getChildQueues().remove(queue);
+  }
+  
+  /**
+   * Returns true if there are no applications, running or not, in the given
+   * queue or any of its descendents.
+   */
+  protected boolean isEmpty(FSQueue queue) {
+    if (queue instanceof FSLeafQueue) {
+      FSLeafQueue leafQueue = (FSLeafQueue)queue;
+      return queue.getNumRunnableApps() == 0 &&
+          leafQueue.getNonRunnableAppSchedulables().isEmpty();
+    } else {
+      for (FSQueue child : queue.getChildQueues()) {
+        if (!isEmpty(child)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
    * Gets a queue by name.
    */
   public FSQueue getQueue(String name) {
-    if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
-      name = ROOT_QUEUE + "." + name;
-    }
+    name = ensureRootPrefix(name);
     synchronized (queues) {
       return queues.get(name);
     }
@@ -190,9 +282,7 @@ public class QueueManager {
    * Return whether a queue exists already.
    */
   public boolean exists(String name) {
-    if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
-      name = ROOT_QUEUE + "." + name;
-    }
+    name = ensureRootPrefix(name);
     synchronized (queues) {
       return queues.containsKey(name);
     }
@@ -214,10 +304,19 @@ public class QueueManager {
     return queues.values();
   }
   
+  private String ensureRootPrefix(String name) {
+    if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
+      name = ROOT_QUEUE + "." + name;
+    }
+    return name;
+  }
+  
   public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
     // Make sure all queues exist
     for (String name : queueConf.getQueueNames()) {
-      getLeafQueue(name, true);
+      if (removeEmptyIncompatibleQueues(name)) {
+        getLeafQueue(name, true);
+      }
     }
     
     for (FSQueue queue : queues.values()) {

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Jan 15 06:06:31 2014
@@ -67,9 +67,11 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -103,7 +105,8 @@ import com.google.common.annotations.Vis
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
-public class FifoScheduler implements ResourceScheduler, Configurable {
+public class FifoScheduler extends AbstractYarnScheduler implements
+    ResourceScheduler, Configurable {
 
   private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
 
@@ -114,7 +117,6 @@ public class FifoScheduler implements Re
 
   private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
   private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
-  private RMContext rmContext;
 
   protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
 
@@ -123,15 +125,6 @@ public class FifoScheduler implements Re
   private Resource maximumAllocation;
   private boolean usePortForNodeName;
 
-  @VisibleForTesting
-  protected Map<ApplicationId, SchedulerApplication> applications =
-      new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
-
-  // Use ConcurrentSkipListMap because applications need to be ordered
-  @VisibleForTesting
-  protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts
-      = new ConcurrentSkipListMap<ApplicationAttemptId, FiCaSchedulerApp>();
-
   private ActiveUsersManager activeUsersManager;
 
   private static final String DEFAULT_QUEUE_NAME = "default";
@@ -246,6 +239,9 @@ public class FifoScheduler implements Re
     if (!this.initialized) {
       validateConf(conf);
       this.rmContext = rmContext;
+      //Use ConcurrentSkipListMap because applications need to be ordered
+      this.applications =
+          new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
       this.minimumAllocation = 
         Resources.createResource(conf.getInt(
             YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
@@ -270,7 +266,7 @@ public class FifoScheduler implements Re
   public Allocation allocate(
       ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
       List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
-    FiCaSchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
     if (application == null) {
       LOG.error("Calling allocate on removed " +
           "or non existant application " + applicationAttemptId);
@@ -336,22 +332,26 @@ public class FifoScheduler implements Re
   }
 
   @VisibleForTesting
-  FiCaSchedulerApp getApplication(
-      ApplicationAttemptId applicationAttemptId) {
-    return appAttempts.get(applicationAttemptId);
+  FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
+    SchedulerApplication app =
+        applications.get(applicationAttemptId.getApplicationId());
+    if (app != null) {
+      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
+    }
+    return null;
   }
 
   @Override
   public SchedulerAppReport getSchedulerAppInfo(
       ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplication(applicationAttemptId);
+    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
     return app == null ? null : new SchedulerAppReport(app);
   }
   
   @Override
   public ApplicationResourceUsageReport getAppResourceUsageReport(
       ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplication(applicationAttemptId);
+    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
     return app == null ? null : app.getResourceUsageReport();
   }
   
@@ -362,15 +362,18 @@ public class FifoScheduler implements Re
   private synchronized void addApplication(ApplicationId applicationId,
       String queue, String user) {
     SchedulerApplication application =
-        new SchedulerApplication(null, user);
+        new SchedulerApplication(DEFAULT_QUEUE, user);
     applications.put(applicationId, application);
-    LOG.info("Accepted application " + applicationId + " from user: " + user);
+    metrics.submitApp(user);
+    LOG.info("Accepted application " + applicationId + " from user: " + user
+        + ", currently num of applications: " + applications.size());
     rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
   }
 
-  private synchronized void addApplicationAttempt(
-      ApplicationAttemptId appAttemptId) {
+  private synchronized void
+      addApplicationAttempt(ApplicationAttemptId appAttemptId,
+          boolean transferStateFromPreviousAttempt) {
     SchedulerApplication application =
         applications.get(appAttemptId.getApplicationId());
     String user = application.getUser();
@@ -378,11 +381,16 @@ public class FifoScheduler implements Re
     FiCaSchedulerApp schedulerApp =
         new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
           activeUsersManager, this.rmContext);
-    appAttempts.put(appAttemptId, schedulerApp);
-    metrics.submitApp(user, appAttemptId.getAttemptId());
+
+    if (transferStateFromPreviousAttempt) {
+      schedulerApp.transferStateFromPreviousAttempt(application
+        .getCurrentAppAttempt());
+    }
+    application.setCurrentAppAttempt(schedulerApp);
+
+    metrics.submitAppAttempt(user);
     LOG.info("Added Application Attempt " + appAttemptId
-        + " to scheduler from user " + application.getUser()
-        + ", currently active: " + appAttempts.size());
+        + " to scheduler from user " + application.getUser());
     rmContext.getDispatcher().getEventHandler().handle(
         new RMAppAttemptEvent(appAttemptId,
             RMAppAttemptEventType.ATTEMPT_ADDED));
@@ -391,37 +399,47 @@ public class FifoScheduler implements Re
   private synchronized void doneApplication(ApplicationId applicationId,
       RMAppState finalState) {
     SchedulerApplication application = applications.get(applicationId);
+    if (application == null){
+      LOG.warn("Couldn't find application " + applicationId);
+      return;
+    }
 
     // Inform the activeUsersManager
     activeUsersManager.deactivateApplication(application.getUser(),
       applicationId);
+    application.stop(finalState);
     applications.remove(applicationId);
   }
 
   private synchronized void doneApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
-      RMAppAttemptState rmAppAttemptFinalState)
+      RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
       throws IOException {
-    FiCaSchedulerApp application = getApplication(applicationAttemptId);
-    if (application == null) {
+    FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
+    SchedulerApplication application =
+        applications.get(applicationAttemptId.getApplicationId());
+    if (application == null || attempt == null) {
       throw new IOException("Unknown application " + applicationAttemptId + 
       " has completed!");
     }
 
     // Kill all 'live' containers
-    for (RMContainer container : application.getLiveContainers()) {
-      containerCompleted(container, 
-          SchedulerUtils.createAbnormalContainerStatus(
-              container.getContainerId(), 
-              SchedulerUtils.COMPLETED_APPLICATION),
-          RMContainerEventType.KILL);
+    for (RMContainer container : attempt.getLiveContainers()) {
+      if (keepContainers
+          && container.getState().equals(RMContainerState.RUNNING)) {
+        // do not kill the running container in the case of work-preserving AM
+        // restart.
+        LOG.info("Skip killing " + container.getContainerId());
+        continue;
+      }
+      containerCompleted(container,
+        SchedulerUtils.createAbnormalContainerStatus(
+          container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
+        RMContainerEventType.KILL);
     }
 
     // Clean up pending requests, metrics etc.
-    application.stop(rmAppAttemptFinalState);
-
-    // Remove the application
-    appAttempts.remove(applicationAttemptId);
+    attempt.stop(rmAppAttemptFinalState);
   }
   
   /**
@@ -432,12 +450,13 @@ public class FifoScheduler implements Re
   private void assignContainers(FiCaSchedulerNode node) {
     LOG.debug("assignContainers:" +
         " node=" + node.getRMNode().getNodeAddress() + 
-        " #applications=" + appAttempts.size());
+        " #applications=" + applications.size());
 
     // Try to assign containers to applications in fifo order
-    for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : appAttempts
+    for (Map.Entry<ApplicationId, SchedulerApplication> e : applications
         .entrySet()) {
-      FiCaSchedulerApp application = e.getValue();
+      FiCaSchedulerApp application =
+          (FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
       LOG.debug("pre-assignContainers");
       application.showRequests();
       synchronized (application) {
@@ -474,8 +493,10 @@ public class FifoScheduler implements Re
 
     // Update the applications' headroom to correctly take into
     // account the containers assigned in this update.
-    for (FiCaSchedulerApp application : appAttempts.values()) {
-      application.setHeadroom(Resources.subtract(clusterResource, usedResource));
+    for (SchedulerApplication application : applications.values()) {
+      FiCaSchedulerApp attempt =
+          (FiCaSchedulerApp) application.getCurrentAppAttempt();
+      attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
     }
   }
 
@@ -744,7 +765,8 @@ public class FifoScheduler implements Re
     {
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
-      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -754,7 +776,8 @@ public class FifoScheduler implements Re
       try {
         doneApplicationAttempt(
           appAttemptRemovedEvent.getApplicationAttemptID(),
-          appAttemptRemovedEvent.getFinalAttemptState());
+          appAttemptRemovedEvent.getFinalAttemptState(),
+          appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
       } catch(IOException ie) {
         LOG.error("Unable to remove application "
             + appAttemptRemovedEvent.getApplicationAttemptID(), ie);
@@ -780,12 +803,11 @@ public class FifoScheduler implements Re
 
   private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
     // Get the application for the finished container
-    ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
-    FiCaSchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
     if (application == null) {
-      LOG.info("Unknown application: " + applicationAttemptId + 
-          " launched container " + containerId +
-          " on node: " + node);
+      LOG.info("Unknown application "
+          + containerId.getApplicationAttemptId().getApplicationId()
+          + " launched container " + containerId + " on node: " + node);
       // Some unknown container sneaked into the system. Kill it.
       this.rmContext.getDispatcher().getEventHandler()
         .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
@@ -806,14 +828,16 @@ public class FifoScheduler implements Re
 
     // Get the application for the finished container
     Container container = rmContainer.getContainer();
-    ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
-    FiCaSchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application =
+        getCurrentAttemptForContainer(container.getId());
+    ApplicationId appId =
+        container.getId().getApplicationAttemptId().getApplicationId();
     
     // Get the node on which the container was allocated
     FiCaSchedulerNode node = getNode(container.getNodeId());
     
     if (application == null) {
-      LOG.info("Unknown application: " + applicationAttemptId + 
+      LOG.info("Unknown application: " + appId + 
           " released container " + container.getId() +
           " on node: " + node + 
           " with event: " + event);
@@ -829,7 +853,7 @@ public class FifoScheduler implements Re
     // Update total usage
     Resources.subtractFrom(usedResource, container.getResource());
 
-    LOG.info("Application " + applicationAttemptId + 
+    LOG.info("Application attempt " + application.getApplicationAttemptId() + 
         " released container " + container.getId() +
         " on node: " + node + 
         " with event: " + event);
@@ -887,11 +911,22 @@ public class FifoScheduler implements Re
     FiCaSchedulerNode node = getNode(nodeId);
     return node == null ? null : new SchedulerNodeReport(node);
   }
-  
-  private RMContainer getRMContainer(ContainerId containerId) {
-    FiCaSchedulerApp application = 
-        getApplication(containerId.getApplicationAttemptId());
-    return (application == null) ? null : application.getRMContainer(containerId);
+
+  @Override
+  public RMContainer getRMContainer(ContainerId containerId) {
+    FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
+    return (attempt == null) ? null : attempt.getRMContainer(containerId);
+  }
+
+  private FiCaSchedulerApp getCurrentAttemptForContainer(
+      ContainerId containerId) {
+    SchedulerApplication app =
+        applications.get(containerId.getApplicationAttemptId()
+          .getApplicationId());
+    if (app != null) {
+      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
+    }
+    return null;
   }
 
   @Override
@@ -908,12 +943,12 @@ public class FifoScheduler implements Re
   @Override
   public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) {
     if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
-      List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>(
-          appAttempts.size());
-      for (FiCaSchedulerApp app : appAttempts.values()) {
-        apps.add(app.getApplicationAttemptId());
+      List<ApplicationAttemptId> attempts = new ArrayList<ApplicationAttemptId>(
+          applications.size());
+      for (SchedulerApplication app : applications.values()) {
+        attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
       }
-      return apps;
+      return attempts;
     } else {
       return null;
     }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java Wed Jan 15 06:06:31 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.re
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
@@ -53,6 +54,9 @@ public class RMPolicyProvider extends Po
     new Service(
         YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL, 
         ContainerManagementProtocolPB.class),
+    new Service(
+        CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL,
+        HAServiceProtocol.class),
   };
 
   @Override

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AboutBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AboutBlock.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AboutBlock.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AboutBlock.java Wed Jan 15 06:06:31 2014
@@ -43,6 +43,7 @@ public class AboutBlock extends HtmlBloc
     info("Cluster overview").
       _("Cluster ID:", cinfo.getClusterId()).
       _("ResourceManager state:", cinfo.getState()).
+      _("ResourceManager HA state:", cinfo.getHAState()).
       _("ResourceManager started on:", Times.format(cinfo.getStartedOn())).
       _("ResourceManager version:", cinfo.getRMBuildVersion() +
           " on " + cinfo.getRMVersionBuiltOn()).

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java Wed Jan 15 06:06:31 2014
@@ -21,6 +21,7 @@ import javax.xml.bind.annotation.XmlAcce
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlRootElement;
 
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@@ -33,6 +34,7 @@ public class ClusterInfo {
   protected long id;
   protected long startedOn;
   protected STATE state;
+  protected HAServiceProtocol.HAServiceState haState;
   protected String resourceManagerVersion;
   protected String resourceManagerBuildVersion;
   protected String resourceManagerVersionBuiltOn;
@@ -48,6 +50,7 @@ public class ClusterInfo {
 
     this.id = ts;
     this.state = rm.getServiceState();
+    this.haState = rm.getRMContext().getHAServiceState();
     this.startedOn = ts;
     this.resourceManagerVersion = YarnVersionInfo.getVersion();
     this.resourceManagerBuildVersion = YarnVersionInfo.getBuildVersion();
@@ -61,6 +64,10 @@ public class ClusterInfo {
     return this.state.toString();
   }
 
+  public String getHAState() {
+    return this.haState.toString();
+  }
+
   public String getRMVersion() {
     return this.resourceManagerVersion;
   }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Wed Jan 15 06:06:31 2014
@@ -171,7 +171,7 @@ public class Application {
         new AppAddedSchedulerEvent(this.applicationId, this.queue, "user");
     scheduler.handle(addAppEvent);
     AppAttemptAddedSchedulerEvent addAttemptEvent =
-        new AppAttemptAddedSchedulerEvent(this.applicationAttemptId);
+        new AppAttemptAddedSchedulerEvent(this.applicationAttemptId, false);
     scheduler.handle(addAttemptEvent);
   }
   

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Wed Jan 15 06:06:31 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
+import org.mortbay.log.Log;
 
 public class MockNM {
 
@@ -130,12 +131,13 @@ public class MockNM {
       int containerId, ContainerState containerState) throws Exception {
     HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
         new HashMap<ApplicationId, List<ContainerStatus>>(1);
-    ContainerStatus amContainerStatus = BuilderUtils.newContainerStatus(
-        BuilderUtils.newContainerId(attemptId, 1),
-        ContainerState.COMPLETE, "Success", 0);
+    ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
+        BuilderUtils.newContainerId(attemptId, containerId), containerState,
+        "Success", 0);
     ArrayList<ContainerStatus> containerStatusList =
         new ArrayList<ContainerStatus>(1);
-    containerStatusList.add(amContainerStatus);
+    containerStatusList.add(containerStatus);
+    Log.info("ContainerStatus: " + containerStatus);
     nodeUpdate.put(attemptId.getApplicationId(), containerStatusList);
     return nodeHeartbeat(nodeUpdate, true);
   }
@@ -152,6 +154,7 @@ public class MockNM {
     status.setResponseId(resId);
     status.setNodeId(nodeId);
     for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
+      Log.info("entry.getValue() " + entry.getValue());
       status.setContainersStatuses(entry.getValue());
     }
     NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Wed Jan 15 06:06:31 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.DataOutputBu
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
@@ -40,7 +41,10 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -56,6 +60,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -122,6 +128,33 @@ public class MockRM extends ResourceMana
         attempt.getAppAttemptState());
   }
 
+  public void waitForContainerAllocated(MockNM nm, ContainerId containerId)
+      throws Exception {
+    int timeoutSecs = 0;
+    while (getResourceScheduler().getRMContainer(containerId) == null
+        && timeoutSecs++ < 40) {
+      System.out.println("Waiting for" + containerId + " to be allocated.");
+      nm.nodeHeartbeat(true);
+      Thread.sleep(200);
+    }
+  }
+
+  public void waitForState(MockNM nm, ContainerId containerId,
+      RMContainerState containerState) throws Exception {
+    RMContainer container = getResourceScheduler().getRMContainer(containerId);
+    Assert.assertNotNull("Container shouldn't be null", container);
+    int timeoutSecs = 0;
+    while (!containerState.equals(container.getState()) && timeoutSecs++ < 40) {
+      System.out.println("Container : " + containerId + " State is : "
+          + container.getState() + " Waiting for state : " + containerState);
+      nm.nodeHeartbeat(true);
+      Thread.sleep(300);
+    }
+    System.out.println("Container State is : " + container.getState());
+    Assert.assertEquals("Container state is not correct (timedout)",
+      containerState, container.getState());
+  }
+
   // get new application id
   public GetNewApplicationResponse getNewAppId() throws Exception {
     ApplicationClientProtocol client = getClientRMService();
@@ -172,7 +205,17 @@ public class MockRM extends ResourceMana
   public RMApp submitApp(int masterMemory, String name, String user,
       Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
       int maxAppAttempts, Credentials ts, String appType,
-      boolean waitForAccepted) throws Exception {
+      boolean waitForAccepted)
+      throws Exception {
+    return submitApp(masterMemory, name, user, acls, unmanaged, queue,
+      maxAppAttempts, ts, appType, waitForAccepted, false);
+  }
+
+  public RMApp submitApp(int masterMemory, String name, String user,
+      Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
+      int maxAppAttempts, Credentials ts, String appType,
+      boolean waitForAccepted, boolean keepContainers)
+      throws Exception {
     ApplicationClientProtocol client = getClientRMService();
     GetNewApplicationResponse resp = client.getNewApplication(Records
         .newRecord(GetNewApplicationRequest.class));
@@ -182,6 +225,7 @@ public class MockRM extends ResourceMana
         .newRecord(SubmitApplicationRequest.class);
     ApplicationSubmissionContext sub = Records
         .newRecord(ApplicationSubmissionContext.class);
+    sub.setKeepContainersAcrossApplicationAttempts(keepContainers);
     sub.setApplicationId(appId);
     sub.setApplicationName(name);
     sub.setMaxAppAttempts(maxAppAttempts);
@@ -421,4 +465,26 @@ public class MockRM extends ResourceMana
     // override to disable webapp
   }
 
+  public static void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
+      MockAM am) throws Exception {
+    FinishApplicationMasterRequest req =
+        FinishApplicationMasterRequest.newInstance(
+          FinalApplicationStatus.SUCCEEDED, "", "");
+    am.unregisterAppAttempt(req);
+    am.waitForState(RMAppAttemptState.FINISHING);
+    nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am.waitForState(RMAppAttemptState.FINISHED);
+    rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
+  }
+
+  public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
+      throws Exception {
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    nm.nodeHeartbeat(true);
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
+    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+    return am;
+  }
+
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Wed Jan 15 06:06:31 2014
@@ -649,7 +649,7 @@ public class TestClientRMService {
             .currentTimeMillis(), "YARN"));
     ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(applicationId3, 1);
     RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
-        rmContext, yarnScheduler, null, asContext, config);
+        rmContext, yarnScheduler, null, asContext, config, false);
     when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);
     return app;
   }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Wed Jan 15 06:06:31 2014
@@ -302,7 +302,7 @@ public class TestFifoScheduler {
         new AppAddedSchedulerEvent(appId1, "queue", "user");
     fs.handle(appEvent);
     SchedulerEvent attemptEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId1);
+        new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
     fs.handle(attemptEvent);
 
     List<ContainerId> emptyId = new ArrayList<ContainerId>();
@@ -396,7 +396,7 @@ public class TestFifoScheduler {
         new AppAddedSchedulerEvent(appId1, "queue", "user");
     fs.handle(appEvent);
     SchedulerEvent attemptEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId1);
+        new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
     fs.handle(attemptEvent);
 
     ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2);
@@ -406,7 +406,7 @@ public class TestFifoScheduler {
         new AppAddedSchedulerEvent(appId2, "queue", "user");
     fs.handle(appEvent2);
     SchedulerEvent attemptEvent2 =
-        new AppAttemptAddedSchedulerEvent(appAttemptId2);
+        new AppAttemptAddedSchedulerEvent(appAttemptId2, false);
     fs.handle(attemptEvent2);
 
     List<ContainerId> emptyId = new ArrayList<ContainerId>();

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Wed Jan 15 06:06:31 2014
@@ -28,7 +28,6 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
@@ -38,7 +37,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
@@ -295,6 +293,8 @@ public class TestRM {
         nm2.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
             ContainerState.COMPLETE);
       }
+      nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1,
+        ContainerState.COMPLETE);
       am.waitForState(RMAppAttemptState.FINISHED);
       Assert.assertFalse(nmTokenSecretManager
           .isApplicationAttemptRegistered(attempt.getAppAttemptId()));
@@ -389,19 +389,19 @@ public class TestRM {
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
-    MockAM am1 = launchAM(app1, rm1, nm1);
-    finishApplicationMaster(app1, rm1, nm1, am1);
+    MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
+    MockRM.finishApplicationMaster(app1, rm1, nm1, am1);
 
     // a failed app
     RMApp app2 = rm1.submitApp(200);
-    MockAM am2 = launchAM(app2, rm1, nm1);
+    MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
     nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
     am2.waitForState(RMAppAttemptState.FAILED);
     rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED);
 
     // a killed app
     RMApp app3 = rm1.submitApp(200);
-    MockAM am3 = launchAM(app3, rm1, nm1);
+    MockAM am3 = MockRM.launchAM(app3, rm1, nm1);
     rm1.killApp(app3.getApplicationId());
     rm1.waitForState(app3.getApplicationId(), RMAppState.KILLED);
     rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.KILLED);
@@ -441,7 +441,7 @@ public class TestRM {
 
     // a failed app
     RMApp app2 = rm1.submitApp(200);
-    MockAM am2 = launchAM(app2, rm1, nm1);
+    MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
     nm1
       .nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
     am2.waitForState(RMAppAttemptState.FAILED);
@@ -458,28 +458,6 @@ public class TestRM {
     Assert.assertEquals(-1, report1.getRpcPort());
   }
 
-  private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
-      throws Exception {
-    RMAppAttempt attempt = app.getCurrentAppAttempt();
-    nm.nodeHeartbeat(true);
-    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
-    am.registerAppAttempt();
-    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
-    return am;
-  }
-
-  private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
-      MockAM am) throws Exception {
-    FinishApplicationMasterRequest req =
-        FinishApplicationMasterRequest.newInstance(
-          FinalApplicationStatus.SUCCEEDED, "", "");
-    am.unregisterAppAttempt(req);
-    am.waitForState(RMAppAttemptState.FINISHING);
-    nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
-    am.waitForState(RMAppAttemptState.FINISHED);
-    rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
-  }
-
   public static void main(String[] args) throws Exception {
     TestRM t = new TestRM();
     t.testGetNewAppId();

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java Wed Jan 15 06:06:31 2014
@@ -26,8 +26,11 @@ import org.apache.hadoop.ha.HAServicePro
 import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
 import org.apache.hadoop.ha.HealthCheckFailedException;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -222,4 +225,81 @@ public class TestRMHA {
     checkMonitorHealth();
     checkActiveRMFunctionality();
   }
+
+  @Test
+  public void testRMDispatcherForHA() throws IOException {
+    String errorMessageForEventHandler =
+        "Expect to get the same number of handlers";
+    String errorMessageForService = "Expect to get the same number of services";
+    Configuration conf = new YarnConfiguration(configuration);
+    rm = new MockRM(conf) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return new MyCountingDispatcher();
+      }
+    };
+    rm.init(conf);
+    int expectedEventHandlerCount =
+        ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
+            .getEventHandlerCount();
+    int expectedServiceCount = rm.getServices().size();
+    assertTrue(expectedEventHandlerCount != 0);
+
+    StateChangeRequestInfo requestInfo = new StateChangeRequestInfo(
+        HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+    assertEquals(STATE_ERR, HAServiceState.INITIALIZING,
+        rm.adminService.getServiceStatus().getState());
+    assertFalse("RM is ready to become active before being started",
+        rm.adminService.getServiceStatus().isReadyToBecomeActive());
+    rm.start();
+
+    //call transitions to standby and active a couple of times
+    rm.adminService.transitionToStandby(requestInfo);
+    rm.adminService.transitionToActive(requestInfo);
+    rm.adminService.transitionToStandby(requestInfo);
+    rm.adminService.transitionToActive(requestInfo);
+    rm.adminService.transitionToStandby(requestInfo);
+
+    rm.adminService.transitionToActive(requestInfo);
+    assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
+        ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
+        .getEventHandlerCount());
+    assertEquals(errorMessageForService, expectedServiceCount,
+        rm.getServices().size());
+
+    rm.adminService.transitionToStandby(requestInfo);
+    assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
+        ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
+        .getEventHandlerCount());
+    assertEquals(errorMessageForService, expectedServiceCount,
+        rm.getServices().size());
+
+    rm.stop();
+  }
+
+  @SuppressWarnings("rawtypes")
+  class MyCountingDispatcher extends AbstractService implements Dispatcher {
+
+    private int eventHandlerCount;
+
+    public MyCountingDispatcher() {
+      super("MyCountingDispatcher");
+      this.eventHandlerCount = 0;
+    }
+
+    @Override
+    public EventHandler getEventHandler() {
+      return null;
+    }
+
+    @Override
+    public void register(Class<? extends Enum> eventType, EventHandler handler) {
+      this.eventHandlerCount ++;
+    }
+
+    public int getEventHandlerCount() {
+      return this.eventHandlerCount;
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Wed Jan 15 06:06:31 2014
@@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.Level;
@@ -179,7 +180,7 @@ public class TestRMRestart {
     am1.registerAppAttempt();
 
     // AM request for containers
-    am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());    
+    am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());   
     // kick the scheduler
     nm1.nodeHeartbeat(true);
     List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
@@ -1543,6 +1544,128 @@ public class TestRMRestart {
     Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp);
   }
 
+  @SuppressWarnings("resource")
+  @Test
+  public void testQueueMetricsOnRMRestart() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // PHASE 1: create state in an RM
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    QueueMetrics qm1 = rm1.getResourceScheduler().getRootQueueMetrics();
+    resetQueueMetrics(qm1);
+    assertQueueMetrics(qm1, 0, 0, 0, 0);
+
+    // create app that gets launched and does allocate before RM restart
+    RMApp app1 = rm1.submitApp(200);
+    assertQueueMetrics(qm1, 1, 1, 0, 0);
+    nm1.nodeHeartbeat(true);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
+    rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+    MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
+    am1.registerAppAttempt();
+    am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>()); 
+    nm1.nodeHeartbeat(true);
+    List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+    while (conts.size() == 0) {
+      nm1.nodeHeartbeat(true);
+      conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>()).getAllocatedContainers());
+      Thread.sleep(500);
+    }
+    assertQueueMetrics(qm1, 1, 0, 1, 0);
+
+    // PHASE 2: create new RM and start from old state
+    // create new RM to represent restart and recover state
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics();
+    resetQueueMetrics(qm2);
+    assertQueueMetrics(qm2, 0, 0, 0, 0);
+    // recover app
+    RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+    am1.setAMRMProtocol(rm2.getApplicationMasterService());
+    am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
+    nm1.nodeHeartbeat(true);
+    nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
+    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
+    ContainerStatus containerStatus =
+        BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
+            .getCurrentAppAttempt().getAppAttemptId(), 1),
+            ContainerState.COMPLETE, "Killed AM container", 143);
+    containerStatuses.add(containerStatus);
+    nm1.registerNode(containerStatuses);
+    int timeoutSecs = 0;
+    while (loadedApp1.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {;
+      Thread.sleep(200);
+    }
+
+    assertQueueMetrics(qm2, 1, 1, 0, 0);
+    nm1.nodeHeartbeat(true);
+    attempt1 = loadedApp1.getCurrentAppAttempt();
+    attemptId1 = attempt1.getAppAttemptId();
+    rm2.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+    assertQueueMetrics(qm2, 1, 0, 1, 0);
+    am1 = rm2.sendAMLaunched(attempt1.getAppAttemptId());
+    am1.registerAppAttempt();
+    am1.allocate("127.0.0.1" , 1000, 3, new ArrayList<ContainerId>());
+    nm1.nodeHeartbeat(true);
+    conts = am1.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+    while (conts.size() == 0) {
+      nm1.nodeHeartbeat(true);
+      conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>()).getAllocatedContainers());
+      Thread.sleep(500);
+    }
+
+    // finish the AMs
+    finishApplicationMaster(loadedApp1, rm2, nm1, am1);
+    assertQueueMetrics(qm2, 1, 0, 0, 1);
+
+    // stop RM's
+    rm2.stop();
+    rm1.stop();
+  }
+
+
+  // The metrics has some carry-on value from the previous RM, because the
+  // test case is in-memory, for the same queue name (e.g. root), there's
+  // always a singleton QueueMetrics object.
+  private int appsSubmittedCarryOn = 0;
+  private int appsPendingCarryOn = 0;
+  private int appsRunningCarryOn = 0;
+  private int appsCompletedCarryOn = 0;
+
+  private void resetQueueMetrics(QueueMetrics qm) {
+    appsSubmittedCarryOn = qm.getAppsSubmitted();
+    appsPendingCarryOn = qm.getAppsPending();
+    appsRunningCarryOn = qm.getAppsRunning();
+    appsCompletedCarryOn = qm.getAppsCompleted();
+  }
+
+  private void assertQueueMetrics(QueueMetrics qm, int appsSubmitted,
+      int appsPending, int appsRunning, int appsCompleted) {
+    Assert.assertEquals(qm.getAppsSubmitted(),
+        appsSubmitted + appsSubmittedCarryOn);
+    Assert.assertEquals(qm.getAppsPending(),
+        appsPending + appsPendingCarryOn);
+    Assert.assertEquals(qm.getAppsRunning(),
+        appsRunning + appsRunningCarryOn);
+    Assert.assertEquals(qm.getAppsCompleted(),
+        appsCompleted + appsCompletedCarryOn);
+  }
+
   public class TestMemoryRMStateStore extends MemoryRMStateStore {
     int count = 0;
     public int updateApp = 0;

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Wed Jan 15 06:06:31 2014
@@ -164,7 +164,7 @@ public class TestResourceManager {
     // Notify scheduler application is finished.
     AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
         new AppAttemptRemovedSchedulerEvent(
-          application.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
+          application.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
     resourceManager.getResourceScheduler().handle(appRemovedEvent1);
     
     checkResourceUsage(nm1, nm2);