You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2013/05/10 00:46:41 UTC

svn commit: r1480824 [2/2] - in /hadoop/common/branches/HDFS-2802/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java...

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Thu May  9 22:46:39 2013
@@ -23,11 +23,12 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -57,9 +58,12 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
 
+@Private
+@Unstable
 public class FSSchedulerApp extends SchedulerApplication {
 
   private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
@@ -83,7 +87,9 @@ public class FSSchedulerApp extends Sche
 
   final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
       new HashMap<Priority, Map<NodeId, RMContainer>>();
-  
+
+  final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
+
   /**
    * Count how many times the application has been given an opportunity
    * to schedule a task at each priority. Each time the scheduler
@@ -230,6 +236,9 @@ public class FSSchedulerApp extends Sche
     Resource containerResource = rmContainer.getContainer().getResource();
     queue.getMetrics().releaseResources(getUser(), 1, containerResource);
     Resources.subtractFrom(currentConsumption, containerResource);
+
+    // remove from preemption map if it is completed
+    preemptionMap.remove(rmContainer);
   }
 
   synchronized public List<Container> pullNewlyAllocatedContainers() {
@@ -306,8 +315,7 @@ public class FSSchedulerApp extends Sche
    * Used only by unit tests
    * @return total current reservations
    */
-  @Stable
-  @Private
+  @VisibleForTesting
   public synchronized Resource getCurrentReservation() {
     return currentReservation;
   }
@@ -572,4 +580,18 @@ public class FSSchedulerApp extends Sche
         " priority " + priority);
     allowedLocalityLevel.put(priority, level);
   }
+
+  // related methods
+  public void addPreemption(RMContainer container, long time) {
+    assert preemptionMap.get(container) == null;
+    preemptionMap.put(container, time);
+  }
+
+  public Long getContainerPreemptionTime(RMContainer container) {
+    return preemptionMap.get(container);
+  }
+
+  public Set<RMContainer> getPreemptionContainers() {
+    return preemptionMap.keySet();
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Thu May  9 22:46:39 2013
@@ -25,6 +25,8 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -39,6 +41,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
+@Private
+@Unstable
 public class FSSchedulerNode extends SchedulerNode {
 
   private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu May  9 22:46:39 2013
@@ -24,8 +24,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
@@ -155,10 +158,16 @@ public class FairScheduler implements Re
   private Resource clusterCapacity = 
       RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
 
-  // How often tasks are preempted (must be longer than a couple
+  // How often tasks are preempted 
+  protected long preemptionInterval; 
+  
+  // ms to wait before force killing stuff (must be longer than a couple
   // of heartbeats to give task-kill commands a chance to act).
-  protected long preemptionInterval = 15000;
-
+  protected long waitTimeBeforeKill; 
+  
+  // Containers whose AMs have been warned that they will be preempted soon.
+  private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
+  
   protected boolean preemptionEnabled;
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
@@ -225,10 +234,6 @@ public class FairScheduler implements Re
     // Recursively compute fair shares for all queues
     // and update metrics
     rootQueue.recomputeShares();
-
-    // Update recorded capacity of root queue (child queues are updated
-    // when fair share is calculated).
-    rootMetrics.setAvailableResourcesToQueue(clusterCapacity);
   }
 
   /**
@@ -335,34 +340,78 @@ public class FairScheduler implements Re
     // Sort containers into reverse order of priority
     Collections.sort(runningContainers, new Comparator<RMContainer>() {
       public int compare(RMContainer c1, RMContainer c2) {
-        return c2.getContainer().getPriority().compareTo(
+        int ret = c2.getContainer().getPriority().compareTo(
             c1.getContainer().getPriority());
+        if (ret == 0) {
+          return c2.getContainerId().compareTo(c1.getContainerId());
+        }
+        return ret;
       }
     });
+    
+    // Scan down the list of containers we've already warned and kill them
+    // if we need to.  Remove any containers from the list that we don't need
+    // or that are no longer running.
+    Iterator<RMContainer> warnedIter = warnedContainers.iterator();
+    Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
+    while (warnedIter.hasNext()) {
+      RMContainer container = warnedIter.next();
+      if (container.getState() == RMContainerState.RUNNING &&
+          Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+              toPreempt, Resources.none())) {
+        warnOrKillContainer(container, apps.get(container), queues.get(container));
+        preemptedThisRound.add(container);
+        Resources.subtractFrom(toPreempt, container.getContainer().getResource());
+      } else {
+        warnedIter.remove();
+      }
+    }
 
-    // Scan down the sorted list of task statuses until we've killed enough
-    // tasks, making sure we don't kill too many from any queue
-    for (RMContainer container : runningContainers) {
+    // Scan down the rest of the containers until we've preempted enough, making
+    // sure we don't preempt too many from any queue
+    Iterator<RMContainer> runningIter = runningContainers.iterator();
+    while (runningIter.hasNext() &&
+        Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+            toPreempt, Resources.none())) {
+      RMContainer container = runningIter.next();
       FSLeafQueue sched = queues.get(container);
-      if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
-          sched.getResourceUsage(), sched.getFairShare())) {
-        LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
-            "res=" + container.getContainer().getResource() +
-            ") from queue " + sched.getName());
-        ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus(
+      if (!preemptedThisRound.contains(container) &&
+          Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+              sched.getResourceUsage(), sched.getFairShare())) {
+        warnOrKillContainer(container, apps.get(container), sched);
+        
+        warnedContainers.add(container);
+        Resources.subtractFrom(toPreempt, container.getContainer().getResource());
+      }
+    }
+  }
+  
+  private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
+      FSLeafQueue queue) {
+    LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
+        "res=" + container.getContainer().getResource() +
+        ") from queue " + queue.getName());
+    
+    Long time = app.getContainerPreemptionTime(container);
+
+    if (time != null) {
+      // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
+      // proceed with kill
+      if (time + waitTimeBeforeKill < clock.getTime()) {
+        ContainerStatus status =
+          SchedulerUtils.createAbnormalContainerStatus(
             container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
 
         // TODO: Not sure if this ever actually adds this to the list of cleanup
         // containers on the RMNode (see SchedulerNode.releaseContainer()).
         completedContainer(container, status, RMContainerEventType.KILL);
-
-        toPreempt = Resources.subtract(toPreempt,
-            container.getContainer().getResource());
-        if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterCapacity,
-            toPreempt, Resources.none())) {
-          break;
-        }
+        LOG.info("Killing container" + container +
+            " (after waiting for premption for " +
+            (clock.getTime() - time) + "ms)");
       }
+    } else {
+      // track the request in the FSSchedulerApp itself
+      app.addPreemption(container, clock.getTime());
     }
   }
 
@@ -487,11 +536,11 @@ public class FairScheduler implements Re
     return clusterCapacity;
   }
 
-  public Clock getClock() {
+  public synchronized Clock getClock() {
     return clock;
   }
 
-  protected void setClock(Clock clock) {
+  protected synchronized void setClock(Clock clock) {
     this.clock = clock;
   }
 
@@ -617,6 +666,7 @@ public class FairScheduler implements Re
     } else {
       application.containerCompleted(rmContainer, containerStatus, event);
       node.releaseContainer(container);
+      updateRootQueueMetrics();
     }
 
     LOG.info("Application " + applicationAttemptId +
@@ -628,6 +678,7 @@ public class FairScheduler implements Re
   private synchronized void addNode(RMNode node) {
     nodes.put(node.getNodeID(), new FSSchedulerNode(node));
     Resources.addTo(clusterCapacity, node.getTotalCapability());
+    updateRootQueueMetrics();
 
     LOG.info("Added node " + node.getNodeAddress() +
         " cluster capacity: " + clusterCapacity);
@@ -636,6 +687,7 @@ public class FairScheduler implements Re
   private synchronized void removeNode(RMNode rmNode) {
     FSSchedulerNode node = nodes.get(rmNode.getNodeID());
     Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
+    updateRootQueueMetrics();
 
     // Remove running containers
     List<RMContainer> runningContainers = node.getRunningContainers();
@@ -746,10 +798,18 @@ public class FairScheduler implements Re
         LOG.debug("allocate:" +
             " applicationAttemptId=" + appAttemptId +
             " #ask=" + ask.size());
-      }
 
+        LOG.debug("Preempting " + application.getPreemptionContainers().size()
+            + " container(s)");
+      }
+      
+      Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
+      for (RMContainer container : application.getPreemptionContainers()) {
+        preemptionContainerIds.add(container.getContainerId());
+      }
+      
       return new Allocation(application.pullNewlyAllocatedContainers(),
-          application.getHeadroom());
+          application.getHeadroom(), preemptionContainerIds);
     }
   }
 
@@ -832,6 +892,7 @@ public class FairScheduler implements Re
         if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
               queueMgr.getRootQueue().assignContainer(node),
               Resources.none())) {
+          assignedContainers++;
           assignedContainer = true;
         }
         if (!assignedContainer) { break; }
@@ -839,6 +900,7 @@ public class FairScheduler implements Re
         if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
       }
     }
+    updateRootQueueMetrics();
   }
 
   @Override
@@ -860,6 +922,18 @@ public class FairScheduler implements Re
     }
     return new SchedulerAppReport(applications.get(appAttemptId));
   }
+  
+  /**
+   * Subqueue metrics might be a little out of date because fair shares are
+   * recalculated at the update interval, but the root queue metrics needs to
+   * be updated synchronously with allocations and completions so that cluster
+   * metrics will be consistent.
+   */
+  private void updateRootQueueMetrics() {
+    rootMetrics.setAvailableResourcesToQueue(
+        Resources.subtract(
+            clusterCapacity, rootMetrics.getAllocatedResources()));
+  }
 
   @Override
   public QueueMetrics getRootQueueMetrics() {
@@ -950,7 +1024,9 @@ public class FairScheduler implements Re
     assignMultiple = this.conf.getAssignMultiple();
     maxAssign = this.conf.getMaxAssign();
     sizeBasedWeight = this.conf.getSizeBasedWeight();
-    
+    preemptionInterval = this.conf.getPreemptionInterval();
+    waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+
     if (!initialized) {
       rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
       this.rmContext = rmContext;

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Thu May  9 22:46:39 2013
@@ -18,12 +18,15 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.io.File;
-
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
+@Private
+@Evolving
 public class FairSchedulerConfiguration extends Configuration {
   public static final String FS_CONFIGURATION_FILE = "fair-scheduler.xml";
 
@@ -52,6 +55,11 @@ public class FairSchedulerConfiguration 
   /** Whether preemption is enabled. */
   protected static final String  PREEMPTION = CONF_PREFIX + "preemption";
   protected static final boolean DEFAULT_PREEMPTION = false;
+  
+  protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
+  protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
+  protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill";
+  protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
 
   /** Whether to assign multiple containers in one check-in. */
   protected static final String  ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
@@ -120,4 +128,12 @@ public class FairSchedulerConfiguration 
     return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir",
     		"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
   }
+  
+  public int getPreemptionInterval() {
+    return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL);
+  }
+  
+  public int getWaitTimeBeforeKill() {
+    return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Thu May  9 22:46:39 2013
@@ -22,14 +22,14 @@ import java.util.Comparator;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 
 @Public
-@Unstable
+@Evolving
 public abstract class SchedulingPolicy {
   private static final ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy> instances =
       new ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy>();

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Thu May  9 22:46:39 2013
@@ -21,6 +21,8 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Comparator;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
@@ -29,6 +31,8 @@ import org.apache.hadoop.yarn.server.res
 
 import com.google.common.annotations.VisibleForTesting;
 
+@Private
+@Unstable
 public class FairSharePolicy extends SchedulingPolicy {
   @VisibleForTesting
   public static final String NAME = "Fairshare";

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java Thu May  9 22:46:39 2013
@@ -21,6 +21,8 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Comparator;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
@@ -28,6 +30,8 @@ import org.apache.hadoop.yarn.server.res
 
 import com.google.common.annotations.VisibleForTesting;
 
+@Private
+@Unstable
 public class FifoPolicy extends SchedulingPolicy {
   @VisibleForTesting
   public static final String NAME = "FIFO";

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Thu May  9 22:46:39 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
 import java.util.Map;
@@ -199,6 +200,8 @@ public class MockRM extends ResourceMana
           return client.submitApplication(req);
         } catch (YarnRemoteException e) {
           e.printStackTrace();
+        } catch (IOException e) {
+          e.printStackTrace();
         }
         return null;
       }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java Thu May  9 22:46:39 2013
@@ -339,7 +339,7 @@ public class TestClientRMTokens {
     DelegationToken token = loggedInUser
         .doAs(new PrivilegedExceptionAction<DelegationToken>() {
           @Override
-          public DelegationToken run() throws YarnRemoteException {
+          public DelegationToken run() throws YarnRemoteException, IOException {
             GetDelegationTokenRequest request = Records
                 .newRecord(GetDelegationTokenRequest.class);
             request.setRenewer(renewerString);
@@ -355,7 +355,7 @@ public class TestClientRMTokens {
       throws IOException, InterruptedException {
     long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction<Long>() {
       @Override
-      public Long run() throws YarnRemoteException {
+      public Long run() throws YarnRemoteException, IOException {
         RenewDelegationTokenRequest request = Records
             .newRecord(RenewDelegationTokenRequest.class);
         request.setDelegationToken(dToken);
@@ -371,7 +371,7 @@ public class TestClientRMTokens {
       throws IOException, InterruptedException {
     loggedInUser.doAs(new PrivilegedExceptionAction<Void>() {
       @Override
-      public Void run() throws YarnRemoteException {
+      public Void run() throws YarnRemoteException, IOException {
         CancelDelegationTokenRequest request = Records
             .newRecord(CancelDelegationTokenRequest.class);
         request.setDelegationToken(dToken);

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java Thu May  9 22:46:39 2013
@@ -66,20 +66,20 @@ public class TestQueueMetrics {
     MetricsSource userSource = userSource(ms, queueName, user);
     checkApps(queueSource, 1, 1, 0, 0, 0, 0);
 
-    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
-    metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
+    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
+    metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
-    checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
+    checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
 
     metrics.incrAppsRunning(app, user);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0);
 
-    metrics.allocateResources(user, 3, Resources.createResource(2*GB));
-    checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0);
+    metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
+    checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
 
-    metrics.releaseResources(user, 1, Resources.createResource(2*GB));
-    checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
+    metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
+    checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
 
     metrics.finishApp(app, RMAppAttemptState.FINISHED);
     checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@@ -148,25 +148,25 @@ public class TestQueueMetrics {
     checkApps(queueSource, 1, 1, 0, 0, 0, 0);
     checkApps(userSource, 1, 1, 0, 0, 0, 0);
 
-    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
-    metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
-    metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
+    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
+    metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
+    metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
-    checkResources(queueSource, 0, 0, 0, 0,  100*GB, 15*GB, 5, 0, 0);
-    checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0);
+    checkResources(queueSource, 0, 0, 0, 0, 0,  100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
+    checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
 
     metrics.incrAppsRunning(app, user);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0);
     checkApps(userSource, 1, 0, 1, 0, 0, 0);
 
-    metrics.allocateResources(user, 3, Resources.createResource(2*GB));
-    checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0);
-    checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 0, 0);
-
-    metrics.releaseResources(user, 1, Resources.createResource(2*GB));
-    checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
-    checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0);
+    metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
+    checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
+    checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
+
+    metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
+    checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
+    checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
 
     metrics.finishApp(app, RMAppAttemptState.FINISHED);
     checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@@ -197,35 +197,35 @@ public class TestQueueMetrics {
     checkApps(userSource, 1, 1, 0, 0, 0, 0);
     checkApps(parentUserSource, 1, 1, 0, 0, 0, 0);
 
-    parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
-    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
-    parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
-    metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
-    metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
-    checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
-    checkResources(parentQueueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
-    checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0);
-    checkResources(parentUserSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0);
+    parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
+    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
+    parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
+    metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
+    metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
+    checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
+    checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
+    checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
+    checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
 
     metrics.incrAppsRunning(app, user);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0);
     checkApps(userSource, 1, 0, 1, 0, 0, 0);
 
-    metrics.allocateResources(user, 3, Resources.createResource(2*GB));
-    metrics.reserveResource(user, Resources.createResource(3*GB));
+    metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
+    metrics.reserveResource(user, Resources.createResource(3*GB, 3));
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
-    checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 3*GB, 1);
-    checkResources(parentQueueSource, 6*GB, 3, 3, 0,  100*GB, 9*GB, 2, 3*GB, 1);
-    checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1);
-    checkResources(parentUserSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1);
-
-    metrics.releaseResources(user, 1, Resources.createResource(2*GB));
-    metrics.unreserveResource(user, Resources.createResource(3*GB));
-    checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
-    checkResources(parentQueueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
-    checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0);
-    checkResources(parentUserSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0);
+    checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
+    checkResources(parentQueueSource, 6*GB, 6, 3, 3, 0,  100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
+    checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
+    checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
+
+    metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
+    metrics.unreserveResource(user, Resources.createResource(3*GB, 3));
+    checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
+    checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
+    checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
+    checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
 
     metrics.finishApp(app, RMAppAttemptState.FINISHED);
     checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@@ -277,18 +277,23 @@ public class TestQueueMetrics {
   }
 
   public static void checkResources(MetricsSource source, int allocatedMB,
-      int allocCtnrs, long aggreAllocCtnrs, long aggreReleasedCtnrs, 
-      int availableMB, int pendingMB, int pendingCtnrs,
-      int reservedMB, int reservedCtnrs) {
+      int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
+      long aggreReleasedCtnrs, int availableMB, int availableCores, int pendingMB,
+      int pendingCores, int pendingCtnrs, int reservedMB, int reservedCores,
+      int reservedCtnrs) {
     MetricsRecordBuilder rb = getMetrics(source);
     assertGauge("AllocatedMB", allocatedMB, rb);
+    assertGauge("AllocatedVCores", allocatedCores, rb);
     assertGauge("AllocatedContainers", allocCtnrs, rb);
     assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
     assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
     assertGauge("AvailableMB", availableMB, rb);
+    assertGauge("AvailableVCores", availableCores, rb);
     assertGauge("PendingMB", pendingMB, rb);
+    assertGauge("PendingVCores", pendingCores, rb);
     assertGauge("PendingContainers", pendingCtnrs, rb);
     assertGauge("ReservedMB", reservedMB, rb);
+    assertGauge("ReservedVCores", reservedCores, rb);
     assertGauge("ReservedContainers", reservedCtnrs, rb);
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu May  9 22:46:39 2013
@@ -30,6 +30,7 @@ import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
@@ -127,6 +129,7 @@ public class TestFairScheduler {
   public void tearDown() {
     scheduler = null;
     resourceManager = null;
+    QueueMetrics.clearQueueMetrics();
   }
 
   private Configuration createConfiguration() {
@@ -336,6 +339,13 @@ public class TestFairScheduler {
 
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
       getResourceUsage().getMemory());
+
+    // verify metrics
+    QueueMetrics queue1Metrics = scheduler.getQueueManager().getQueue("queue1")
+        .getMetrics();
+    assertEquals(1024, queue1Metrics.getAllocatedMB());
+    assertEquals(1024, scheduler.getRootQueueMetrics().getAllocatedMB());
+    assertEquals(512, scheduler.getRootQueueMetrics().getAvailableMB());
   }
 
   @Test (timeout = 5000)
@@ -891,9 +901,16 @@ public class TestFairScheduler {
    */
   public void testChoiceOfPreemptedContainers() throws Exception {
     Configuration conf = createConfiguration();
+    
+    conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
+    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); 
+    
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+    
+    MockClock clock = new MockClock();
+    scheduler.setClock(clock);
+    
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
@@ -988,15 +1005,38 @@ public class TestFairScheduler {
         Resources.createResource(2 * 1024));
     assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
     assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
-    assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
     assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
     assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size());
+    
+    // First verify we are adding containers to preemption list for the application
+    assertTrue(!Collections.disjoint(scheduler.applications.get(app3).getLiveContainers(),
+                                     scheduler.applications.get(app3).getPreemptionContainers()));
+    assertTrue(!Collections.disjoint(scheduler.applications.get(app6).getLiveContainers(),
+                                     scheduler.applications.get(app6).getPreemptionContainers()));
+
+    // Pretend 15 seconds have passed
+    clock.tick(15);
+
+    // Trigger a kill by insisting we want containers back
+    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
+        Resources.createResource(2 * 1024));
+
+    // At this point the containers should have been killed (since we are not simulating AM)
     assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
+    assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
+
+    // Trigger a kill by insisting we want containers back
+    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
+        Resources.createResource(2 * 1024));
+
+    // Pretend 15 seconds have passed
+    clock.tick(15);
 
     // We should be able to claw back another container from A and B each.
     // Make sure it is lowest priority container.
     scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
         Resources.createResource(2 * 1024));
+    
     assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
     assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
     assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
@@ -1245,6 +1285,7 @@ public class TestFairScheduler {
     scheduler.handle(updateEvent);
 
     assertEquals(1, app.getLiveContainers().size());
+    assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
     
     // Create request at higher priority
     createSchedulingRequestExistingApplication(1024, 1, attId);
@@ -1260,6 +1301,7 @@ public class TestFairScheduler {
     // Complete container
     scheduler.allocate(attId, new ArrayList<ResourceRequest>(),
         Arrays.asList(containerId));
+    assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
     
     // Schedule at opening
     scheduler.update();
@@ -1271,6 +1313,7 @@ public class TestFairScheduler {
     for (RMContainer liveContainer : liveContainers) {
       Assert.assertEquals(2, liveContainer.getContainer().getPriority().getPriority());
     }
+    assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
   }
   
   @Test
@@ -1382,6 +1425,37 @@ public class TestFairScheduler {
     assertEquals(1, app2.getLiveContainers().size());
   }
 
+  @Test(timeout = 3000)
+  public void testMaxAssign() throws AllocationConfigurationException {
+    // set required scheduler configs
+    scheduler.assignMultiple = true;
+    scheduler.getQueueManager().getLeafQueue("root.default")
+        .setPolicy(SchedulingPolicy.getDefault());
+
+    RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(16384));
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+
+    ApplicationAttemptId attId =
+        createSchedulingRequest(1024, "root.default", "user", 8);
+    FSSchedulerApp app = scheduler.applications.get(attId);
+
+    // set maxAssign to 2: only 2 containers should be allocated
+    scheduler.maxAssign = 2;
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Incorrect number of containers allocated", 2, app
+        .getLiveContainers().size());
+
+    // set maxAssign to -1: all remaining containers should be allocated
+    scheduler.maxAssign = -1;
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Incorrect number of containers allocated", 8, app
+        .getLiveContainers().size());
+  }
+
   /**
    * Test to verify the behavior of
    * {@link FSQueue#assignContainer(FSSchedulerNode)})
@@ -1544,4 +1618,24 @@ public class TestFairScheduler {
     assertEquals(1, app.getLiveContainers().size());
     assertEquals(0, app.getReservedContainers().size());
   }
+  
+  @Test
+  public void testRemoveNodeUpdatesRootQueueMetrics() {
+    assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
+    
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+    NodeAddedSchedulerEvent addEvent = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(addEvent);
+    
+    assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
+    scheduler.update(); // update shouldn't change things
+    assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
+    
+    NodeRemovedSchedulerEvent removeEvent = new NodeRemovedSchedulerEvent(node1);
+    scheduler.handle(removeEvent);
+    
+    assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
+    scheduler.update(); // update shouldn't change things
+    assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java Thu May  9 22:46:39 2013
@@ -485,7 +485,8 @@ public class TestContainerManagerSecurit
   }
 
   private Container requestAndGetContainer(AMRMProtocol scheduler,
-      ApplicationId appID) throws YarnRemoteException, InterruptedException {
+      ApplicationId appID) throws YarnRemoteException, InterruptedException,
+      IOException {
 
     // Request a container allocation.
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java Thu May  9 22:46:39 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.webproxy;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.Log;
@@ -78,9 +79,10 @@ public class AppReportFetcher {
    * @param appId the id of the application to get. 
    * @return the ApplicationReport for that app.
    * @throws YarnRemoteException on any error.
+   * @throws IOException
    */
   public ApplicationReport getApplicationReport(ApplicationId appId)
-  throws YarnRemoteException {
+  throws YarnRemoteException, IOException {
     GetApplicationReportRequest request = recordFactory
         .newRecordInstance(GetApplicationReportRequest.class);
     request.setApplicationId(appId);