You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC

svn commit: r1619012 [17/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java Tue Aug 19 23:49:39 2014
@@ -18,17 +18,24 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.w3c.dom.Element;
 import org.w3c.dom.NamedNodeMap;
 import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
 
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@Unstable
 public abstract class QueuePlacementRule {
   protected boolean create;
   
@@ -58,16 +65,20 @@ public abstract class QueuePlacementRule
    *    continue to the next rule, and null indicates that the app should be rejected.
    */
   public String assignAppToQueue(String requestedQueue, String user,
-      Groups groups, Collection<String> configuredQueues) throws IOException {
-    String queue = getQueueForApp(requestedQueue, user, groups, configuredQueues);
-    if (create || configuredQueues.contains(queue)) {
+      Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+      throws IOException {
+   String queue = getQueueForApp(requestedQueue, user, groups,
+        configuredQueues);
+    if (create || configuredQueues.get(FSQueueType.LEAF).contains(queue)
+        || configuredQueues.get(FSQueueType.PARENT).contains(queue)) {
       return queue;
     } else {
       return "";
     }
   }
   
-  public void initializeFromXml(Element el) {
+  public void initializeFromXml(Element el)
+      throws AllocationConfigurationException {
     boolean create = true;
     NamedNodeMap attributes = el.getAttributes();
     Map<String, String> args = new HashMap<String, String>();
@@ -104,15 +115,16 @@ public abstract class QueuePlacementRule
    *    continue to the next rule.
    */
   protected abstract String getQueueForApp(String requestedQueue, String user,
-      Groups groups, Collection<String> configuredQueues) throws IOException;
+      Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+      throws IOException;
 
   /**
    * Places apps in queues by username of the submitter
    */
   public static class User extends QueuePlacementRule {
     @Override
-    protected String getQueueForApp(String requestedQueue,
-        String user, Groups groups, Collection<String> configuredQueues) {
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
       return "root." + user;
     }
     
@@ -127,9 +139,9 @@ public abstract class QueuePlacementRule
    */
   public static class PrimaryGroup extends QueuePlacementRule {
     @Override
-    protected String getQueueForApp(String requestedQueue,
-        String user, Groups groups, 
-        Collection<String> configuredQueues) throws IOException {
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+        throws IOException {
       return "root." + groups.getGroups(user).get(0);
     }
     
@@ -147,12 +159,15 @@ public abstract class QueuePlacementRule
    */
   public static class SecondaryGroupExistingQueue extends QueuePlacementRule {
     @Override
-    protected String getQueueForApp(String requestedQueue,
-        String user, Groups groups, 
-        Collection<String> configuredQueues) throws IOException {
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+        throws IOException {
       List<String> groupNames = groups.getGroups(user);
       for (int i = 1; i < groupNames.size(); i++) {
-        if (configuredQueues.contains("root." + groupNames.get(i))) {
+        String group = groupNames.get(i);
+        if (configuredQueues.get(FSQueueType.LEAF).contains("root." + group)
+            || configuredQueues.get(FSQueueType.PARENT).contains(
+                "root." + group)) {
           return "root." + groupNames.get(i);
         }
       }
@@ -167,12 +182,83 @@ public abstract class QueuePlacementRule
   }
 
   /**
+   * Places apps in queues with name of the submitter under the queue
+   * returned by the nested rule.
+   */
+  public static class NestedUserQueue extends QueuePlacementRule {
+    @VisibleForTesting
+    QueuePlacementRule nestedRule;
+
+    /**
+     * Parse xml and instantiate the nested rule 
+     */
+    @Override
+    public void initializeFromXml(Element el)
+        throws AllocationConfigurationException {
+      NodeList elements = el.getChildNodes();
+
+      for (int i = 0; i < elements.getLength(); i++) {
+        Node node = elements.item(i);
+        if (node instanceof Element) {
+          Element element = (Element) node;
+          if ("rule".equals(element.getTagName())) {
+            QueuePlacementRule rule = QueuePlacementPolicy
+                .createAndInitializeRule(node);
+            if (rule == null) {
+              throw new AllocationConfigurationException(
+                  "Unable to create nested rule in nestedUserQueue rule");
+            }
+            this.nestedRule = rule;
+            break;
+          } else {
+            continue;
+          }
+        }
+      }
+
+      if (this.nestedRule == null) {
+        throw new AllocationConfigurationException(
+            "No nested rule specified in <nestedUserQueue> rule");
+      }
+      super.initializeFromXml(el);
+    }
+    
+    @Override
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+        throws IOException {
+      // Apply the nested rule
+      String queueName = nestedRule.assignAppToQueue(requestedQueue, user,
+          groups, configuredQueues);
+      
+      if (queueName != null && queueName.length() != 0) {
+        if (!queueName.startsWith("root.")) {
+          queueName = "root." + queueName;
+        }
+        
+        // Verify if the queue returned by the nested rule is an configured leaf queue,
+        // if yes then skip to next rule in the queue placement policy
+        if (configuredQueues.get(FSQueueType.LEAF).contains(queueName)) {
+          return "";
+        }
+        return queueName + "." + user;
+      }
+      return queueName;
+    }
+
+    @Override
+    public boolean isTerminal() {
+      return false;
+    }
+  }
+  
+  /**
    * Places apps in queues by requested queue of the submitter
    */
   public static class Specified extends QueuePlacementRule {
     @Override
-    protected String getQueueForApp(String requestedQueue,
-        String user, Groups groups, Collection<String> configuredQueues) {
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
       if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
         return "";
       } else {
@@ -190,34 +276,61 @@ public abstract class QueuePlacementRule
   }
   
   /**
-   * Places all apps in the default queue
+   * Places apps in the specified default queue. If no default queue is
+   * specified the app is placed in root.default queue.
    */
   public static class Default extends QueuePlacementRule {
+    @VisibleForTesting
+    String defaultQueueName;
+
     @Override
-    protected String getQueueForApp(String requestedQueue, String user,
-        Groups groups, Collection<String> configuredQueues) {
-      return "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
+    public QueuePlacementRule initialize(boolean create,
+        Map<String, String> args) {
+      if (defaultQueueName == null) {
+        defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
+      }
+      return super.initialize(create, args);
     }
     
     @Override
+    public void initializeFromXml(Element el)
+        throws AllocationConfigurationException {
+      defaultQueueName = el.getAttribute("queue");
+      if (defaultQueueName != null && !defaultQueueName.isEmpty()) {
+        if (!defaultQueueName.startsWith("root.")) {
+          defaultQueueName = "root." + defaultQueueName;
+        }
+      } else {
+        defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
+      }
+      super.initializeFromXml(el);
+    }
+
+    @Override
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
+      return defaultQueueName;
+    }
+
+    @Override
     public boolean isTerminal() {
       return true;
     }
   }
-  
+
   /**
    * Rejects all apps
    */
   public static class Reject extends QueuePlacementRule {
     @Override
     public String assignAppToQueue(String requestedQueue, String user,
-        Groups groups, Collection<String> configuredQueues) {
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
       return null;
     }
     
     @Override
     protected String getQueueForApp(String requestedQueue, String user,
-        Groups groups, Collection<String> configuredQueues) {
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
       throw new UnsupportedOperationException();
     }
     

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Tue Aug 19 23:49:39 2014
@@ -23,23 +23,18 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
- * A Schedulable represents an entity that can launch tasks, such as a job
- * or a queue. It provides a common interface so that algorithms such as fair
- * sharing can be applied both within a queue and across queues. There are
- * currently two types of Schedulables: JobSchedulables, which represent a
- * single job, and QueueSchedulables, which allocate among jobs in their queue.
- *
- * Separate sets of Schedulables are used for maps and reduces. Each queue has
- * both a mapSchedulable and a reduceSchedulable, and so does each job.
+ * A Schedulable represents an entity that can be scheduled such as an
+ * application or a queue. It provides a common interface so that algorithms
+ * such as fair sharing can be applied both within a queue and across queues.
  *
  * A Schedulable is responsible for three roles:
- * 1) It can launch tasks through assignTask().
- * 2) It provides information about the job/queue to the scheduler, including:
+ * 1) Assign resources through {@link #assignContainer}.
+ * 2) It provides information about the app/queue to the scheduler, including:
  *    - Demand (maximum number of tasks required)
- *    - Number of currently running tasks
  *    - Minimum share (for queues)
  *    - Job/queue weight (for fair sharing)
  *    - Start time and priority (for FIFO)
@@ -56,64 +51,61 @@ import org.apache.hadoop.yarn.util.resou
  */
 @Private
 @Unstable
-public abstract class Schedulable {
-  /** Fair share assigned to this Schedulable */
-  private Resource fairShare = Resources.createResource(0);
-
+public interface Schedulable {
   /**
    * Name of job/queue, used for debugging as well as for breaking ties in
    * scheduling order deterministically.
    */
-  public abstract String getName();
+  public String getName();
 
   /**
    * Maximum number of resources required by this Schedulable. This is defined as
    * number of currently utilized resources + number of unlaunched resources (that
    * are either not yet launched or need to be speculated).
    */
-  public abstract Resource getDemand();
+  public Resource getDemand();
 
   /** Get the aggregate amount of resources consumed by the schedulable. */
-  public abstract Resource getResourceUsage();
+  public Resource getResourceUsage();
 
   /** Minimum Resource share assigned to the schedulable. */
-  public abstract Resource getMinShare();
+  public Resource getMinShare();
 
   /** Maximum Resource share assigned to the schedulable. */
-  public abstract Resource getMaxShare();
+  public Resource getMaxShare();
 
   /** Job/queue weight in fair sharing. */
-  public abstract ResourceWeights getWeights();
+  public ResourceWeights getWeights();
 
   /** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
-  public abstract long getStartTime();
+  public long getStartTime();
 
  /** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */
-  public abstract Priority getPriority();
+  public Priority getPriority();
 
   /** Refresh the Schedulable's demand and those of its children if any. */
-  public abstract void updateDemand();
+  public void updateDemand();
 
   /**
    * Assign a container on this node if possible, and return the amount of
    * resources assigned.
    */
-  public abstract Resource assignContainer(FSSchedulerNode node);
+  public Resource assignContainer(FSSchedulerNode node);
 
-  /** Assign a fair share to this Schedulable. */
-  public void setFairShare(Resource fairShare) {
-    this.fairShare = fairShare;
-  }
+  /**
+   * Preempt a container from this Schedulable if possible.
+   */
+  public RMContainer preemptContainer();
 
   /** Get the fair share assigned to this Schedulable. */
-  public Resource getFairShare() {
-    return fairShare;
-  }
-
-  /** Convenient toString implementation for debugging. */
-  @Override
-  public String toString() {
-    return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
-        getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
-  }
+  public Resource getFairShare();
+
+  /** Assign a fair share to this Schedulable. */
+  public void setFairShare(Resource fairShare);
+
+  /**
+   * Returns true if queue has atleast one app running. Always returns true for
+   * AppSchedulables.
+   */
+  public boolean isActive();
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Tue Aug 19 23:49:39 2014
@@ -139,4 +139,25 @@ public abstract class SchedulingPolicy {
    */
   public abstract void computeShares(
       Collection<? extends Schedulable> schedulables, Resource totalResources);
+
+  /**
+   * Check if the resource usage is over the fair share under this policy
+   *
+   * @param usage {@link Resource} the resource usage
+   * @param fairShare {@link Resource} the fair share
+   * @return true if check passes (is over) or false otherwise
+   */
+  public abstract boolean checkIfUsageOverFairShare(
+      Resource usage, Resource fairShare);
+
+  /**
+   * Check if a leaf queue's AM resource usage over its limit under this policy
+   *
+   * @param usage {@link Resource} the resource used by application masters
+   * @param maxAMResource {@link Resource} the maximum allowed resource for
+   *                                      application masters
+   * @return true if AM resource usage is over the limit
+   */
+  public abstract boolean checkIfAMResourceUsageOverLimit(
+      Resource usage, Resource maxAMResource);
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java Tue Aug 19 23:49:39 2014
@@ -32,5 +32,5 @@ import org.apache.hadoop.conf.Configurab
 @Private
 @Unstable
 public interface WeightAdjuster {
-  public double adjustWeight(AppSchedulable app, double curWeight);
+  public double adjustWeight(FSAppAttempt app, double curWeight);
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java Tue Aug 19 23:49:39 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
 
+import java.util.ArrayList;
 import java.util.Collection;
 
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -33,7 +34,31 @@ import org.apache.hadoop.yarn.server.res
 public class ComputeFairShares {
   
   private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
-  
+
+  /**
+   * Compute fair share of the given schedulables.Fair share is an allocation of
+   * shares considering only active schedulables ie schedulables which have
+   * running apps.
+   * 
+   * @param schedulables
+   * @param totalResources
+   * @param type
+   */
+  public static void computeShares(
+      Collection<? extends Schedulable> schedulables, Resource totalResources,
+      ResourceType type) {
+    Collection<Schedulable> activeSchedulables = new ArrayList<Schedulable>();
+    for (Schedulable sched : schedulables) {
+      if (sched.isActive()) {
+        activeSchedulables.add(sched);
+      } else {
+        setResourceValue(0, sched.getFairShare(), type);
+      }
+    }
+
+    computeSharesInternal(activeSchedulables, totalResources, type);
+  }
+
   /**
    * Given a set of Schedulables and a number of slots, compute their weighted
    * fair shares. The min and max shares and of the Schedulables are assumed to
@@ -75,7 +100,7 @@ public class ComputeFairShares {
    * because resourceUsedWithWeightToResourceRatio is linear-time and the number of
    * iterations of binary search is a constant (dependent on desired precision).
    */
-  public static void computeShares(
+  private static void computeSharesInternal(
       Collection<? extends Schedulable> schedulables, Resource totalResources,
       ResourceType type) {
     if (schedulables.isEmpty()) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java Tue Aug 19 23:49:39 2014
@@ -70,6 +70,16 @@ public class DominantResourceFairnessPol
   }
   
   @Override
+  public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
+    return !Resources.fitsIn(usage, fairShare);
+  }
+
+  @Override
+  public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+    return !Resources.fitsIn(usage, maxAMResource);
+  }
+
+  @Override
   public void initialize(Resource clusterCapacity) {
     comparator.setClusterCapacity(clusterCapacity);
   }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Tue Aug 19 23:49:39 2014
@@ -65,6 +65,7 @@ public class FairSharePolicy extends Sch
   private static class FairShareComparator implements Comparator<Schedulable>,
       Serializable {
     private static final long serialVersionUID = 5564969375856699313L;
+    private static final Resource ONE = Resources.createResource(1);
 
     @Override
     public int compare(Schedulable s1, Schedulable s2) {
@@ -78,11 +79,10 @@ public class FairSharePolicy extends Sch
           s1.getResourceUsage(), minShare1);
       boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
           s2.getResourceUsage(), minShare2);
-      Resource one = Resources.createResource(1);
       minShareRatio1 = (double) s1.getResourceUsage().getMemory()
-          / Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory();
+          / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
       minShareRatio2 = (double) s2.getResourceUsage().getMemory()
-          / Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory();
+          / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();
       useToWeightRatio1 = s1.getResourceUsage().getMemory() /
           s1.getWeights().getWeight(ResourceType.MEMORY);
       useToWeightRatio2 = s2.getResourceUsage().getMemory() /
@@ -120,6 +120,16 @@ public class FairSharePolicy extends Sch
   }
 
   @Override
+  public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
+    return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare);
+  }
+
+  @Override
+  public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+    return usage.getMemory() > maxAMResource.getMemory();
+  }
+
+  @Override
   public byte getApplicableDepth() {
     return SchedulingPolicy.DEPTH_ANY;
   }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java Tue Aug 19 23:49:39 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.re
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.Iterator;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -89,6 +88,18 @@ public class FifoPolicy extends Scheduli
   }
 
   @Override
+  public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
+    throw new UnsupportedOperationException(
+        "FifoPolicy doesn't support checkIfUsageOverFairshare operation, " +
+            "as FifoPolicy only works for FSLeafQueue.");
+  }
+
+  @Override
+  public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+    return usage.getMemory() > maxAMResource.getMemory();
+  }
+
+  @Override
   public byte getApplicableDepth() {
     return SchedulingPolicy.DEPTH_LEAF;
   }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Tue Aug 19 23:49:39 2014
@@ -25,7 +25,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.commons.logging.Log;
@@ -38,7 +37,6 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -54,8 +52,6 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -68,7 +64,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@@ -76,11 +71,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -104,7 +98,8 @@ import com.google.common.annotations.Vis
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
-public class FifoScheduler extends AbstractYarnScheduler implements
+public class FifoScheduler extends
+    AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
     Configurable {
 
   private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
@@ -114,11 +109,6 @@ public class FifoScheduler extends Abstr
 
   Configuration conf;
 
-  protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
-
-  private boolean initialized;
-  private Resource minimumAllocation;
-  private Resource maximumAllocation;
   private boolean usePortForNodeName;
 
   private ActiveUsersManager activeUsersManager;
@@ -185,8 +175,60 @@ public class FifoScheduler extends Abstr
     public ActiveUsersManager getActiveUsersManager() {
       return activeUsersManager;
     }
+
+    @Override
+    public void recoverContainer(Resource clusterResource,
+        SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+      if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+        return;
+      }
+      increaseUsedResources(rmContainer);
+      updateAppHeadRoom(schedulerAttempt);
+      updateAvailableResourcesMetrics();
+    }
   };
 
+  public FifoScheduler() {
+    super(FifoScheduler.class.getName());
+  }
+
+  private synchronized void initScheduler(Configuration conf) {
+    validateConf(conf);
+    //Use ConcurrentSkipListMap because applications need to be ordered
+    this.applications =
+        new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
+    this.minimumAllocation =
+        Resources.createResource(conf.getInt(
+            YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+    this.maximumAllocation =
+        Resources.createResource(conf.getInt(
+            YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+    this.usePortForNodeName = conf.getBoolean(
+        YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
+    this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
+        conf);
+    this.activeUsersManager = new ActiveUsersManager(metrics);
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    initScheduler(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
   @Override
   public synchronized void setConf(Configuration conf) {
     this.conf = conf;
@@ -218,18 +260,13 @@ public class FifoScheduler extends Abstr
   }
 
   @Override
-  public Resource getMinimumResourceCapability() {
-    return minimumAllocation;
-  }
-
-  @Override
   public int getNumClusterNodes() {
     return nodes.size();
   }
-  
+
   @Override
-  public Resource getMaximumResourceCapability() {
-    return maximumAllocation;
+  public synchronized void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
   }
 
   @Override
@@ -237,31 +274,8 @@ public class FifoScheduler extends Abstr
       reinitialize(Configuration conf, RMContext rmContext) throws IOException
   {
     setConf(conf);
-    if (!this.initialized) {
-      validateConf(conf);
-      this.rmContext = rmContext;
-      //Use ConcurrentSkipListMap because applications need to be ordered
-      this.applications =
-          new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
-      this.minimumAllocation = 
-        Resources.createResource(conf.getInt(
-            YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
-      this.maximumAllocation = 
-        Resources.createResource(conf.getInt(
-            YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
-      this.usePortForNodeName = conf.getBoolean(
-          YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, 
-          YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
-      this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
-          conf);
-      this.activeUsersManager = new ActiveUsersManager(metrics);
-      this.initialized = true;
-    }
   }
 
-
   @Override
   public Allocation allocate(
       ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
@@ -278,21 +292,7 @@ public class FifoScheduler extends Abstr
         clusterResource, minimumAllocation, maximumAllocation);
 
     // Release containers
-    for (ContainerId releasedContainer : release) {
-      RMContainer rmContainer = getRMContainer(releasedContainer);
-      if (rmContainer == null) {
-         RMAuditLogger.logFailure(application.getUser(),
-             AuditConstants.RELEASE_CONTAINER, 
-             "Unauthorized access or invalid container", "FifoScheduler", 
-             "Trying to release container not owned by app or with invalid id",
-             application.getApplicationId(), releasedContainer);
-      }
-      containerCompleted(rmContainer,
-          SchedulerUtils.createAbnormalContainerStatus(
-              releasedContainer, 
-              SchedulerUtils.RELEASED_CONTAINER),
-          RMContainerEventType.RELEASED);
-    }
+    releaseContainers(release, application);
 
     synchronized (application) {
 
@@ -332,50 +332,35 @@ public class FifoScheduler extends Abstr
     }
   }
 
-  @VisibleForTesting
-  FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
-    SchedulerApplication app =
-        applications.get(applicationAttemptId.getApplicationId());
-    if (app != null) {
-      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
-  }
-
-  @Override
-  public SchedulerAppReport getSchedulerAppInfo(
-      ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
-    return app == null ? null : new SchedulerAppReport(app);
-  }
-  
-  @Override
-  public ApplicationResourceUsageReport getAppResourceUsageReport(
-      ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
-    return app == null ? null : app.getResourceUsageReport();
-  }
-  
   private FiCaSchedulerNode getNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }
 
-  private synchronized void addApplication(ApplicationId applicationId,
-      String queue, String user) {
-    SchedulerApplication application =
-        new SchedulerApplication(DEFAULT_QUEUE, user);
+  @VisibleForTesting
+  public synchronized void addApplication(ApplicationId applicationId,
+      String queue, String user, boolean isAppRecovering) {
+    SchedulerApplication<FiCaSchedulerApp> application =
+        new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
     applications.put(applicationId, application);
     metrics.submitApp(user);
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", currently num of applications: " + applications.size());
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
-  private synchronized void
+  @VisibleForTesting
+  public synchronized void
       addApplicationAttempt(ApplicationAttemptId appAttemptId,
-          boolean transferStateFromPreviousAttempt) {
-    SchedulerApplication application =
+          boolean transferStateFromPreviousAttempt,
+          boolean isAttemptRecovering) {
+    SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(appAttemptId.getApplicationId());
     String user = application.getUser();
     // TODO: Fix store
@@ -392,14 +377,22 @@ public class FifoScheduler extends Abstr
     metrics.submitAppAttempt(user);
     LOG.info("Added Application Attempt " + appAttemptId
         + " to scheduler from user " + application.getUser());
-    rmContext.getDispatcher().getEventHandler().handle(
+    if (isAttemptRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(appAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
         new RMAppAttemptEvent(appAttemptId,
             RMAppAttemptEventType.ATTEMPT_ADDED));
+    }
   }
 
   private synchronized void doneApplication(ApplicationId applicationId,
       RMAppState finalState) {
-    SchedulerApplication application = applications.get(applicationId);
+    SchedulerApplication<FiCaSchedulerApp> application =
+        applications.get(applicationId);
     if (application == null){
       LOG.warn("Couldn't find application " + applicationId);
       return;
@@ -417,7 +410,7 @@ public class FifoScheduler extends Abstr
       RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
       throws IOException {
     FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
-    SchedulerApplication application =
+    SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     if (application == null || attempt == null) {
       throw new IOException("Unknown application " + applicationAttemptId + 
@@ -433,7 +426,7 @@ public class FifoScheduler extends Abstr
         LOG.info("Skip killing " + container.getContainerId());
         continue;
       }
-      containerCompleted(container,
+      completedContainer(container,
         SchedulerUtils.createAbnormalContainerStatus(
           container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
         RMContainerEventType.KILL);
@@ -454,10 +447,13 @@ public class FifoScheduler extends Abstr
         " #applications=" + applications.size());
 
     // Try to assign containers to applications in fifo order
-    for (Map.Entry<ApplicationId, SchedulerApplication> e : applications
+    for (Map.Entry<ApplicationId, SchedulerApplication<FiCaSchedulerApp>> e : applications
         .entrySet()) {
-      FiCaSchedulerApp application =
-          (FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
+      FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt();
+      if (application == null) {
+        continue;
+      }
+
       LOG.debug("pre-assignContainers");
       application.showRequests();
       synchronized (application) {
@@ -494,10 +490,13 @@ public class FifoScheduler extends Abstr
 
     // Update the applications' headroom to correctly take into
     // account the containers assigned in this update.
-    for (SchedulerApplication application : applications.values()) {
+    for (SchedulerApplication<FiCaSchedulerApp> application : applications.values()) {
       FiCaSchedulerApp attempt =
           (FiCaSchedulerApp) application.getCurrentAppAttempt();
-      attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
+      if (attempt == null) {
+        continue;
+      }
+      updateAppHeadRoom(attempt);
     }
   }
 
@@ -668,11 +667,10 @@ public class FifoScheduler extends Abstr
             application.allocate(type, node, priority, request, container);
         
         // Inform the node
-        node.allocateContainer(application.getApplicationId(), 
-            rmContainer);
+        node.allocateContainer(rmContainer);
 
         // Update usage for this container
-        Resources.addTo(usedResource, capability);
+        increaseUsedResources(rmContainer);
       }
 
     }
@@ -702,7 +700,7 @@ public class FifoScheduler extends Abstr
     for (ContainerStatus completedContainer : completedContainers) {
       ContainerId containerId = completedContainer.getContainerId();
       LOG.debug("Container FINISHED: " + containerId);
-      containerCompleted(getRMContainer(containerId), 
+      completedContainer(getRMContainer(containerId), 
           completedContainer, RMContainerEventType.FINISHED);
     }
 
@@ -716,9 +714,22 @@ public class FifoScheduler extends Abstr
       LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
           + node.getAvailableResource());
     }
-    
-    metrics.setAvailableResourcesToQueue(
-        Resources.subtract(clusterResource, usedResource));
+
+    updateAvailableResourcesMetrics();
+  }
+
+  private void increaseUsedResources(RMContainer rmContainer) {
+    Resources.addTo(usedResource, rmContainer.getAllocatedResource());
+  }
+
+  private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) {
+    schedulerAttempt.setHeadroom(Resources.subtract(clusterResource,
+      usedResource));
+  }
+
+  private void updateAvailableResourcesMetrics() {
+    metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource,
+      usedResource));
   }
 
   @Override
@@ -728,6 +739,9 @@ public class FifoScheduler extends Abstr
     {
       NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
       addNode(nodeAddedEvent.getAddedRMNode());
+      recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
+        nodeAddedEvent.getAddedRMNode());
+
     }
     break;
     case NODE_REMOVED:
@@ -747,7 +761,8 @@ public class FifoScheduler extends Abstr
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
     }
     break;
     case APP_REMOVED:
@@ -762,7 +777,8 @@ public class FifoScheduler extends Abstr
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+        appAttemptAddedEvent.getIsAttemptRecovering());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -785,7 +801,7 @@ public class FifoScheduler extends Abstr
       ContainerExpiredSchedulerEvent containerExpiredEvent = 
           (ContainerExpiredSchedulerEvent) event;
       ContainerId containerid = containerExpiredEvent.getContainerId();
-      containerCompleted(getRMContainer(containerid), 
+      completedContainer(getRMContainer(containerid), 
           SchedulerUtils.createAbnormalContainerStatus(
               containerid, 
               SchedulerUtils.EXPIRED_CONTAINER),
@@ -797,25 +813,9 @@ public class FifoScheduler extends Abstr
     }
   }
 
-  private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
-    // Get the application for the finished container
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      // Some unknown container sneaked into the system. Kill it.
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-
-      return;
-    }
-    
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
   @Lock(FifoScheduler.class)
-  private synchronized void containerCompleted(RMContainer rmContainer,
+  @Override
+  protected synchronized void completedContainer(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {
     if (rmContainer == null) {
       LOG.info("Null container completed...");
@@ -856,7 +856,6 @@ public class FifoScheduler extends Abstr
      
   }
   
-  private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
   private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
 
   private synchronized void removeNode(RMNode nodeInfo) {
@@ -866,7 +865,7 @@ public class FifoScheduler extends Abstr
     }
     // Kill running containers
     for(RMContainer container : node.getRunningContainers()) {
-      containerCompleted(container, 
+      completedContainer(container, 
           SchedulerUtils.createAbnormalContainerStatus(
               container.getContainerId(), 
               SchedulerUtils.LOST_CONTAINER),
@@ -903,28 +902,11 @@ public class FifoScheduler extends Abstr
   }
 
   @Override
-  public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) {
-    FiCaSchedulerNode node = getNode(nodeId);
-    return node == null ? null : new SchedulerNodeReport(node);
-  }
-
-  @Override
   public RMContainer getRMContainer(ContainerId containerId) {
     FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
     return (attempt == null) ? null : attempt.getRMContainer(containerId);
   }
 
-  private FiCaSchedulerApp getCurrentAttemptForContainer(
-      ContainerId containerId) {
-    SchedulerApplication app =
-        applications.get(containerId.getApplicationAttemptId()
-          .getApplicationId());
-    if (app != null) {
-      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
-  }
-
   @Override
   public QueueMetrics getRootQueueMetrics() {
     return DEFAULT_QUEUE.getMetrics();
@@ -935,13 +917,14 @@ public class FifoScheduler extends Abstr
       QueueACL acl, String queueName) {
     return DEFAULT_QUEUE.hasAccess(acl, callerUGI);
   }
-  
+
   @Override
-  public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) {
+  public synchronized List<ApplicationAttemptId>
+      getAppsInQueue(String queueName) {
     if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
-      List<ApplicationAttemptId> attempts = new ArrayList<ApplicationAttemptId>(
-          applications.size());
-      for (SchedulerApplication app : applications.values()) {
+      List<ApplicationAttemptId> attempts =
+          new ArrayList<ApplicationAttemptId>(applications.size());
+      for (SchedulerApplication<FiCaSchedulerApp> app : applications.values()) {
         attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
       }
       return attempts;
@@ -950,4 +933,7 @@ public class FifoScheduler extends Abstr
     }
   }
 
+  public Resource getUsedResource() {
+    return usedResource;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java Tue Aug 19 23:49:39 2014
@@ -19,22 +19,32 @@
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-
-import javax.crypto.SecretKey;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * AMRM-tokens are per ApplicationAttempt. If users redistribute their
@@ -49,40 +59,73 @@ public class AMRMTokenSecretManager exte
   private static final Log LOG = LogFactory
     .getLog(AMRMTokenSecretManager.class);
 
-  private SecretKey masterKey;
+  private int serialNo = new SecureRandom().nextInt();
+  private MasterKeyData nextMasterKey;
+  private MasterKeyData currentMasterKey;
+
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
   private final Timer timer;
   private final long rollingInterval;
+  private final long activationDelay;
+  private RMContext rmContext;
 
-  private final Map<ApplicationAttemptId, byte[]> passwords =
-      new HashMap<ApplicationAttemptId, byte[]>();
+  private final Set<ApplicationAttemptId> appAttemptSet =
+      new HashSet<ApplicationAttemptId>();
 
   /**
    * Create an {@link AMRMTokenSecretManager}
    */
-  public AMRMTokenSecretManager(Configuration conf) {
-    rollMasterKey();
+  public AMRMTokenSecretManager(Configuration conf, RMContext rmContext) {
+    this.rmContext = rmContext;
     this.timer = new Timer();
     this.rollingInterval =
         conf
           .getLong(
             YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
             YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
+    // Adding delay = 1.5 * expiry interval makes sure that all active AMs get
+    // the updated shared-key.
+    this.activationDelay =
+        (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
+    LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
+        + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + " ms");
+    if (rollingInterval <= activationDelay * 2) {
+      throw new IllegalArgumentException(
+          YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
+              + " should be more than 2 X "
+              + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
+    }
   }
 
   public void start() {
-    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), 0, rollingInterval);
+    if (this.currentMasterKey == null) {
+      this.currentMasterKey = createNewMasterKey();
+      AMRMTokenSecretManagerState state =
+          AMRMTokenSecretManagerState.newInstance(
+            this.currentMasterKey.getMasterKey(), null);
+      rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state,
+        false);
+    }
+    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
+      rollingInterval);
   }
 
   public void stop() {
     this.timer.cancel();
   }
 
-  public synchronized void applicationMasterFinished(
-      ApplicationAttemptId appAttemptId) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Application finished, removing password for " + appAttemptId);
+  public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Application finished, removing password for " + appAttemptId);
+      this.appAttemptSet.remove(appAttemptId);
+    } finally {
+      this.writeLock.unlock();
     }
-    this.passwords.remove(appAttemptId);
   }
 
   private class MasterKeyRoller extends TimerTask {
@@ -93,49 +136,100 @@ public class AMRMTokenSecretManager exte
   }
 
   @Private
-  public synchronized void setMasterKey(SecretKey masterKey) {
-    this.masterKey = masterKey;
+  void rollMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Rolling master-key for amrm-tokens");
+      this.nextMasterKey = createNewMasterKey();
+      AMRMTokenSecretManagerState state =
+          AMRMTokenSecretManagerState.newInstance(
+            this.currentMasterKey.getMasterKey(),
+            this.nextMasterKey.getMasterKey());
+      rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state,
+        true);
+      this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
-  @Private
-  public synchronized SecretKey getMasterKey() {
-    return this.masterKey;
+  private class NextKeyActivator extends TimerTask {
+    @Override
+    public void run() {
+      activateNextMasterKey();
+    }
+  }
+
+  public void activateNextMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Activating next master key with id: "
+          + this.nextMasterKey.getMasterKey().getKeyId());
+      this.currentMasterKey = this.nextMasterKey;
+      this.nextMasterKey = null;
+      AMRMTokenSecretManagerState state =
+          AMRMTokenSecretManagerState.newInstance(
+            this.currentMasterKey.getMasterKey(), null);
+      rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state,
+        true);
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
   @Private
-  synchronized void rollMasterKey() {
-    LOG.info("Rolling master-key for amrm-tokens");
-    this.masterKey = generateSecret();
+  @VisibleForTesting
+  public MasterKeyData createNewMasterKey() {
+    this.writeLock.lock();
+    try {
+      return new MasterKeyData(serialNo++, generateSecret());
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
-  /**
-   * Create a password for a given {@link AMRMTokenIdentifier}. Used to
-   * send to the AppicationAttempt which can give it back during authentication.
-   */
-  @Override
-  public synchronized byte[] createPassword(
-      AMRMTokenIdentifier identifier) {
-    ApplicationAttemptId applicationAttemptId =
-        identifier.getApplicationAttemptId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Creating password for " + applicationAttemptId);
-    }
-    byte[] password = createPassword(identifier.getBytes(), masterKey);
-    this.passwords.put(applicationAttemptId, password);
-    return password;
+  public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
+      ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
+      AMRMTokenIdentifier identifier =
+          new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey()
+            .getKeyId());
+      byte[] password = this.createPassword(identifier);
+      appAttemptSet.add(appAttemptId);
+      return new Token<AMRMTokenIdentifier>(identifier.getBytes(), password,
+        identifier.getKind(), new Text());
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  // If nextMasterKey is not Null, then return nextMasterKey
+  // otherwise return currentMasterKey
+  @VisibleForTesting
+  public MasterKeyData getMasterKey() {
+    this.readLock.lock();
+    try {
+      return nextMasterKey == null ? currentMasterKey : nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   /**
    * Populate persisted password of AMRMToken back to AMRMTokenSecretManager.
    */
-  public synchronized void
-      addPersistedPassword(Token<AMRMTokenIdentifier> token) throws IOException {
-    AMRMTokenIdentifier identifier = token.decodeIdentifier();
-    if (LOG.isDebugEnabled()) {
+  public void addPersistedPassword(Token<AMRMTokenIdentifier> token)
+      throws IOException {
+    this.writeLock.lock();
+    try {
+      AMRMTokenIdentifier identifier = token.decodeIdentifier();
       LOG.debug("Adding password for " + identifier.getApplicationAttemptId());
+      appAttemptSet.add(identifier.getApplicationAttemptId());
+    } finally {
+      this.writeLock.unlock();
     }
-    this.passwords.put(identifier.getApplicationAttemptId(),
-      token.getPassword());
   }
 
   /**
@@ -143,19 +237,33 @@ public class AMRMTokenSecretManager exte
    * Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}.
    */
   @Override
-  public synchronized byte[] retrievePassword(
-      AMRMTokenIdentifier identifier) throws InvalidToken {
-    ApplicationAttemptId applicationAttemptId =
-        identifier.getApplicationAttemptId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Trying to retrieve password for " + applicationAttemptId);
-    }
-    byte[] password = this.passwords.get(applicationAttemptId);
-    if (password == null) {
-      throw new InvalidToken("Password not found for ApplicationAttempt "
-          + applicationAttemptId);
+  public byte[] retrievePassword(AMRMTokenIdentifier identifier)
+      throws InvalidToken {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to retrieve password for " + applicationAttemptId);
+      }
+      if (!appAttemptSet.contains(applicationAttemptId)) {
+        throw new InvalidToken(applicationAttemptId
+            + " not found in AMRMTokenSecretManager.");
+      }
+      if (identifier.getKeyId() == this.currentMasterKey.getMasterKey()
+        .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+          this.currentMasterKey.getSecretKey());
+      } else if (nextMasterKey != null
+          && identifier.getKeyId() == this.nextMasterKey.getMasterKey()
+            .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+          this.nextMasterKey.getSecretKey());
+      }
+      throw new InvalidToken("Invalid AMRMToken from " + applicationAttemptId);
+    } finally {
+      this.readLock.unlock();
     }
-    return password;
   }
 
   /**
@@ -167,4 +275,61 @@ public class AMRMTokenSecretManager exte
     return new AMRMTokenIdentifier();
   }
 
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getCurrnetMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.currentMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getNextMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  @Private
+  protected byte[] createPassword(AMRMTokenIdentifier identifier) {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      LOG.info("Creating password for " + applicationAttemptId);
+      return createPassword(identifier.getBytes(), getMasterKey()
+        .getSecretKey());
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public void recover(RMState state) {
+    if (state.getAMRMTokenSecretManagerState() != null) {
+      // recover the current master key
+      MasterKey currentKey =
+          state.getAMRMTokenSecretManagerState().getCurrentMasterKey();
+      this.currentMasterKey =
+          new MasterKeyData(currentKey, createSecretKey(currentKey.getBytes()
+            .array()));
+
+      // recover the next master key if not null
+      MasterKey nextKey =
+          state.getAMRMTokenSecretManagerState().getNextMasterKey();
+      if (nextKey != null) {
+        this.nextMasterKey =
+            new MasterKeyData(nextKey, createSecretKey(nextKey.getBytes()
+              .array()));
+        this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Tue Aug 19 23:49:39 2014
@@ -388,7 +388,11 @@ public class DelegationTokenRenewer exte
       // If user provides incorrect token then it should not be added for
       // renewal.
       for (DelegationTokenToRenew dtr : tokenList) {
-        renewToken(dtr);
+        try {
+          renewToken(dtr);
+        } catch (IOException ioe) {
+          throw new IOException("Failed to renew token: " + dtr.token, ioe);
+        }
       }
       for (DelegationTokenToRenew dtr : tokenList) {
         addTokenToList(dtr);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java Tue Aug 19 23:49:39 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -34,8 +35,8 @@ import org.apache.hadoop.yarn.security.C
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 
 /**
  * SecretManager for ContainerTokens. This is RM-specific and rolls the
@@ -169,11 +170,13 @@ public class RMContainerTokenSecretManag
    * @param nodeId
    * @param appSubmitter
    * @param capability
+   * @param priority
+   * @param createTime
    * @return the container-token
    */
-  public Token
-      createContainerToken(ContainerId containerId, NodeId nodeId,
-          String appSubmitter, Resource capability) {
+  public Token createContainerToken(ContainerId containerId, NodeId nodeId,
+      String appSubmitter, Resource capability, Priority priority,
+      long createTime) {
     byte[] password;
     ContainerTokenIdentifier tokenIdentifier;
     long expiryTimeStamp =
@@ -185,7 +188,8 @@ public class RMContainerTokenSecretManag
       tokenIdentifier =
           new ContainerTokenIdentifier(containerId, nodeId.toString(),
             appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
-              .getMasterKey().getKeyId(), ResourceManager.getClusterTimeStamp());
+              .getMasterKey().getKeyId(),
+            ResourceManager.getClusterTimeStamp(), priority, createTime);
       password = this.createPassword(tokenIdentifier);
 
     } finally {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java Tue Aug 19 23:49:39 2014
@@ -29,8 +29,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -193,4 +195,14 @@ public class RMDelegationTokenSecretMana
       addPersistedDelegationToken(entry.getKey(), entry.getValue());
     }
   }
+
+  public long getRenewDate(RMDelegationTokenIdentifier ident)
+      throws InvalidToken {
+    DelegationTokenInformation info = currentTokens.get(ident);
+    if (info == null) {
+      throw new InvalidToken("token (" + ident.toString()
+          + ") can't be found in cache");
+    }
+    return info.getRenewDate();
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java Tue Aug 19 23:49:39 2014
@@ -36,7 +36,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
@@ -110,19 +112,42 @@ public class AppBlock extends HtmlBlock 
 
     setTitle(join("Application ", aid));
 
-    info("Application Overview").
-      _("User:", app.getUser()).
-      _("Name:", app.getName()).
-      _("Application Type:", app.getApplicationType()).
-      _("Application Tags:", app.getApplicationTags()).
-      _("State:", app.getState()).
-      _("FinalStatus:", app.getFinalStatus()).
-      _("Started:", Times.format(app.getStartTime())).
-      _("Elapsed:", StringUtils.formatTime(
-        Times.elapsed(app.getStartTime(), app.getFinishTime()))).
-      _("Tracking URL:", !app.isTrackingUrlReady() ?
-        "#" : app.getTrackingUrlPretty(), app.getTrackingUI()).
-      _("Diagnostics:", app.getNote());
+    RMAppMetrics appMerics = rmApp.getRMAppMetrics();
+    RMAppAttemptMetrics attemptMetrics =
+        rmApp.getCurrentAppAttempt().getRMAppAttemptMetrics();
+    info("Application Overview")
+        ._("User:", app.getUser())
+        ._("Name:", app.getName())
+        ._("Application Type:", app.getApplicationType())
+        ._("Application Tags:", app.getApplicationTags())
+        ._("State:", app.getState())
+        ._("FinalStatus:", app.getFinalStatus())
+        ._("Started:", Times.format(app.getStartTime()))
+        ._("Elapsed:",
+            StringUtils.formatTime(Times.elapsed(app.getStartTime(),
+                app.getFinishTime())))
+        ._("Tracking URL:",
+            !app.isTrackingUrlReady() ? "#" : app.getTrackingUrlPretty(),
+            app.getTrackingUI())
+        ._("Diagnostics:", app.getNote());
+
+    DIV<Hamlet> pdiv = html.
+        _(InfoBlock.class).
+        div(_INFO_WRAP);
+    info("Application Overview").clear();
+    info("Application Metrics")
+        ._("Total Resource Preempted:",
+          appMerics.getResourcePreempted())
+        ._("Total Number of Non-AM Containers Preempted:",
+          String.valueOf(appMerics.getNumNonAMContainersPreempted()))
+        ._("Total Number of AM Containers Preempted:",
+          String.valueOf(appMerics.getNumAMContainersPreempted()))
+        ._("Resource Preempted from Current Attempt:",
+          attemptMetrics.getResourcePreempted())
+        ._("Number of Non-AM Containers Preempted from Current Attempt:",
+          String.valueOf(attemptMetrics
+            .getNumNonAMContainersPreempted()));
+    pdiv._();
 
     Collection<RMAppAttempt> attempts = rmApp.getAppAttempts().values();
     String amString =

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java Tue Aug 19 23:49:39 2014
@@ -23,7 +23,6 @@ import static org.apache.hadoop.yarn.uti
 import java.util.ArrayList;
 
 import org.apache.hadoop.util.StringUtils;
-import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -109,7 +108,7 @@ class CapacitySchedulerPage extends RmVi
           _("Absolute Used Capacity:", percent(lqinfo.getAbsoluteUsedCapacity() / 100)).
           _("Absolute Capacity:", percent(lqinfo.getAbsoluteCapacity() / 100)).
           _("Absolute Max Capacity:", percent(lqinfo.getAbsoluteMaxCapacity() / 100)).
-          _("Used Resources:", StringEscapeUtils.escapeHtml(lqinfo.getResourcesUsed().toString())).
+          _("Used Resources:", lqinfo.getResourcesUsed().toString()).
           _("Num Schedulable Applications:", Integer.toString(lqinfo.getNumActiveApplications())).
           _("Num Non-Schedulable Applications:", Integer.toString(lqinfo.getNumPendingApplications())).
           _("Num Containers:", Integer.toString(lqinfo.getNumContainers())).
@@ -263,7 +262,7 @@ class CapacitySchedulerPage extends RmVi
           "    var q = $('.q', data.rslt.obj).first().text();",
           "    if (q == 'root') q = '';",
           "    else q = '^' + q.substr(q.lastIndexOf('.') + 1) + '$';",
-          "    $('#apps').dataTable().fnFilter(q, 3, true);",
+          "    $('#apps').dataTable().fnFilter(q, 4, true);",
           "  });",
           "  $('#cs').show();",
           "});")._().

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java Tue Aug 19 23:49:39 2014
@@ -137,7 +137,7 @@ class DefaultSchedulerPage extends RmVie
           "  $('#cs').bind('select_node.jstree', function(e, data) {",
           "    var q = $('.q', data.rslt.obj).first().text();",
             "    if (q == 'root') q = '';",
-          "    $('#apps').dataTable().fnFilter(q, 3);",
+          "    $('#apps').dataTable().fnFilter(q, 4);",
           "  });",
           "  $('#cs').show();",
           "});")._();

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java Tue Aug 19 23:49:39 2014
@@ -25,6 +25,8 @@ import static org.apache.hadoop.yarn.web
 
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang.StringEscapeUtils;
@@ -35,6 +37,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
@@ -60,7 +63,14 @@ public class FairSchedulerAppsBlock exte
     super(ctx);
     FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler();
     fsinfo = new FairSchedulerInfo(scheduler);
-    apps = rmContext.getRMApps();
+    apps = new ConcurrentHashMap<ApplicationId, RMApp>();
+    for (Map.Entry<ApplicationId, RMApp> entry : rmContext.getRMApps().entrySet()) {
+      if (!(RMAppState.NEW.equals(entry.getValue().getState())
+          || RMAppState.NEW_SAVING.equals(entry.getValue().getState())
+          || RMAppState.SUBMITTED.equals(entry.getValue().getState()))) {
+        apps.put(entry.getKey(), entry.getValue());
+      }
+    }
     this.conf = conf;
   }
   

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java Tue Aug 19 23:49:39 2014
@@ -22,7 +22,6 @@ import static org.apache.hadoop.yarn.uti
 
 import java.util.Collection;
 
-import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
@@ -65,20 +64,16 @@ public class FairSchedulerPage extends R
     @Override
     protected void render(Block html) {
       ResponseInfo ri = info("\'" + qinfo.getQueueName() + "\' Queue Status").
-          _("Used Resources:", StringEscapeUtils.escapeHtml(
-              qinfo.getUsedResources().toString())).
+          _("Used Resources:", qinfo.getUsedResources().toString()).
           _("Num Active Applications:", qinfo.getNumActiveApplications()).
           _("Num Pending Applications:", qinfo.getNumPendingApplications()).
-          _("Min Resources:", StringEscapeUtils.escapeHtml(
-              qinfo.getMinResources().toString())).
-          _("Max Resources:", StringEscapeUtils.escapeHtml(
-              qinfo.getMaxResources().toString()));
+          _("Min Resources:", qinfo.getMinResources().toString()).
+          _("Max Resources:", qinfo.getMaxResources().toString());
       int maxApps = qinfo.getMaxApplications();
       if (maxApps < Integer.MAX_VALUE) {
           ri._("Max Running Applications:", qinfo.getMaxApplications());
       }
-      ri._("Fair Share:", StringEscapeUtils.escapeHtml(
-        qinfo.getFairShare().toString()));
+      ri._("Fair Share:", qinfo.getFairShare().toString());
 
       html._(InfoBlock.class);
 
@@ -215,7 +210,7 @@ public class FairSchedulerPage extends R
           "    var q = $('.q', data.rslt.obj).first().text();",
           "    if (q == 'root') q = '';",
           "    else q = '^' + q.substr(q.lastIndexOf('.') + 1) + '$';",
-          "    $('#apps').dataTable().fnFilter(q, 3, true);",
+          "    $('#apps').dataTable().fnFilter(q, 4, true);",
           "  });",
           "  $('#cs').show();",
           "});")._().