You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC

svn commit: r1619012 [15/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java Tue Aug 19 23:49:39 2014
@@ -68,7 +68,9 @@ public class AllocationFileLoaderService
    * (this is done to prevent loading a file that hasn't been fully written).
    */
   public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000;
-  
+
+  public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
+
   private final Clock clock;
 
   private long lastSuccessfulReload; // Last time we successfully reloaded queues
@@ -96,58 +98,69 @@ public class AllocationFileLoaderService
   }
   
   @Override
-  public void init(Configuration conf) {
+  public void serviceInit(Configuration conf) throws Exception {
     this.allocFile = getAllocationFile(conf);
-    super.init(conf);
-  }
-  
-  @Override
-  public void start() {
-    if (allocFile == null) {
-      return;
-    }
-    reloadThread = new Thread() {
-      public void run() {
-        while (running) {
-          long time = clock.getTime();
-          long lastModified = allocFile.lastModified();
-          if (lastModified > lastSuccessfulReload &&
-              time > lastModified + ALLOC_RELOAD_WAIT_MS) {
-            try {
-              reloadAllocations();
-            } catch (Exception ex) {
+    if (allocFile != null) {
+      reloadThread = new Thread() {
+        @Override
+        public void run() {
+          while (running) {
+            long time = clock.getTime();
+            long lastModified = allocFile.lastModified();
+            if (lastModified > lastSuccessfulReload &&
+                time > lastModified + ALLOC_RELOAD_WAIT_MS) {
+              try {
+                reloadAllocations();
+              } catch (Exception ex) {
+                if (!lastReloadAttemptFailed) {
+                  LOG.error("Failed to reload fair scheduler config file - " +
+                      "will use existing allocations.", ex);
+                }
+                lastReloadAttemptFailed = true;
+              }
+            } else if (lastModified == 0l) {
               if (!lastReloadAttemptFailed) {
-                LOG.error("Failed to reload fair scheduler config file - " +
-                    "will use existing allocations.", ex);
+                LOG.warn("Failed to reload fair scheduler config file because" +
+                    " last modified returned 0. File exists: "
+                    + allocFile.exists());
               }
               lastReloadAttemptFailed = true;
             }
-          } else if (lastModified == 0l) {
-            if (!lastReloadAttemptFailed) {
-              LOG.warn("Failed to reload fair scheduler config file because" +
-                  " last modified returned 0. File exists: " + allocFile.exists());
+            try {
+              Thread.sleep(reloadIntervalMs);
+            } catch (InterruptedException ex) {
+              LOG.info(
+                  "Interrupted while waiting to reload alloc configuration");
             }
-            lastReloadAttemptFailed = true;
-          }
-          try {
-            Thread.sleep(reloadIntervalMs);
-          } catch (InterruptedException ex) {
-            LOG.info("Interrupted while waiting to reload alloc configuration");
           }
         }
-      }
-    };
-    reloadThread.setName("AllocationFileReloader");
-    reloadThread.setDaemon(true);
-    reloadThread.start();
-    super.start();
+      };
+      reloadThread.setName("AllocationFileReloader");
+      reloadThread.setDaemon(true);
+    }
+    super.serviceInit(conf);
   }
   
   @Override
-  public void stop() {
+  public void serviceStart() throws Exception {
+    if (reloadThread != null) {
+      reloadThread.start();
+    }
+    super.serviceStart();
+  }
+  
+  @Override
+  public void serviceStop() throws Exception {
     running = false;
-    reloadThread.interrupt();
-    super.stop();
+    if (reloadThread != null) {
+      reloadThread.interrupt();
+      try {
+        reloadThread.join(THREAD_JOIN_TIMEOUT_MS);
+      } catch (InterruptedException e) {
+        LOG.warn("reloadThread fails to join.");
+      }
+    }
+    super.serviceStop();
   }
   
   /**
@@ -200,6 +213,7 @@ public class AllocationFileLoaderService
     Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
     Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
     Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
+    Map<String, Float> queueMaxAMShares = new HashMap<String, Float>();
     Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
     Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
     Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
@@ -207,6 +221,7 @@ public class AllocationFileLoaderService
         new HashMap<String, Map<QueueACL, AccessControlList>>();
     int userMaxAppsDefault = Integer.MAX_VALUE;
     int queueMaxAppsDefault = Integer.MAX_VALUE;
+    float queueMaxAMShareDefault = -1.0f;
     long fairSharePreemptionTimeout = Long.MAX_VALUE;
     long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
     SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
@@ -214,8 +229,15 @@ public class AllocationFileLoaderService
     QueuePlacementPolicy newPlacementPolicy = null;
 
     // Remember all queue names so we can display them on web UI, etc.
-    Set<String> queueNamesInAllocFile = new HashSet<String>();
-
+    // configuredQueues is segregated based on whether it is a leaf queue
+    // or a parent queue. This information is used for creating queues
+    // and also for making queue placement decisions(QueuePlacementRule.java).
+    Map<FSQueueType, Set<String>> configuredQueues = 
+        new HashMap<FSQueueType, Set<String>>();
+    for (FSQueueType queueType : FSQueueType.values()) {
+      configuredQueues.put(queueType, new HashSet<String>());
+    }
+   
     // Read and parse the allocations file.
     DocumentBuilderFactory docBuilderFactory =
       DocumentBuilderFactory.newInstance();
@@ -266,6 +288,11 @@ public class AllocationFileLoaderService
           String text = ((Text)element.getFirstChild()).getData().trim();
           int val = Integer.parseInt(text);
           queueMaxAppsDefault = val;
+        } else if ("queueMaxAMShareDefault".equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          float val = Float.parseFloat(text);
+          val = Math.min(val, 1.0f);
+          queueMaxAMShareDefault = val;
         } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
             || "defaultQueueSchedulingMode".equals(element.getTagName())) {
           String text = ((Text)element.getFirstChild()).getData().trim();
@@ -289,26 +316,27 @@ public class AllocationFileLoaderService
         }
         parent = null;
       }
-      loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps,
-          userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
-          queueAcls, queueNamesInAllocFile);
+      loadQueue(parent, element, minQueueResources, maxQueueResources,
+          queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
+          queuePolicies, minSharePreemptionTimeouts, queueAcls,
+          configuredQueues);
     }
     
     // Load placement policy and pass it configured queues
     Configuration conf = getConfig();
     if (placementPolicyElement != null) {
       newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
-          queueNamesInAllocFile, conf);
+          configuredQueues, conf);
     } else {
       newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
-          queueNamesInAllocFile);
+          configuredQueues);
     }
     
     AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources,
-        queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
-        queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
+        queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault,
+        queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
         queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout,
-        newPlacementPolicy, queueNamesInAllocFile);
+        newPlacementPolicy, configuredQueues);
     
     lastSuccessfulReload = clock.getTime();
     lastReloadAttemptFailed = false;
@@ -321,10 +349,12 @@ public class AllocationFileLoaderService
    */
   private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
       Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
-      Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
+      Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares,
+      Map<String, ResourceWeights> queueWeights,
       Map<String, SchedulingPolicy> queuePolicies,
       Map<String, Long> minSharePreemptionTimeouts,
-      Map<String, Map<QueueACL, AccessControlList>> queueAcls, Set<String> queueNamesInAllocFile) 
+      Map<String, Map<QueueACL, AccessControlList>> queueAcls, 
+      Map<FSQueueType, Set<String>> configuredQueues) 
       throws AllocationConfigurationException {
     String queueName = element.getAttribute("name");
     if (parentName != null) {
@@ -352,6 +382,11 @@ public class AllocationFileLoaderService
         String text = ((Text)field.getFirstChild()).getData().trim();
         int val = Integer.parseInt(text);
         queueMaxApps.put(queueName, val);
+      } else if ("maxAMShare".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        float val = Float.parseFloat(text);
+        val = Math.min(val, 1.0f);
+        queueMaxAMShares.put(queueName, val);
       } else if ("weight".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
         double val = Double.parseDouble(text);
@@ -374,14 +409,21 @@ public class AllocationFileLoaderService
       } else if ("queue".endsWith(field.getTagName()) || 
           "pool".equals(field.getTagName())) {
         loadQueue(queueName, field, minQueueResources, maxQueueResources,
-            queueMaxApps, userMaxApps, queueWeights, queuePolicies,
-            minSharePreemptionTimeouts,
-            queueAcls, queueNamesInAllocFile);
+            queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
+            queuePolicies, minSharePreemptionTimeouts, queueAcls,
+            configuredQueues);
+        configuredQueues.get(FSQueueType.PARENT).add(queueName);
         isLeaf = false;
       }
     }
     if (isLeaf) {
-      queueNamesInAllocFile.add(queueName);
+      // if a leaf in the alloc file is marked as type='parent'
+      // then store it under 'parent'
+      if ("parent".equals(element.getAttribute("type"))) {
+        configuredQueues.get(FSQueueType.PARENT).add(queueName);
+      } else {
+        configuredQueues.get(FSQueueType.LEAF).add(queueName);
+      }
     }
     queueAcls.put(queueName, acls);
     if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Tue Aug 19 23:49:39 2014
@@ -33,21 +33,22 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 
 @Private
 @Unstable
 public class FSLeafQueue extends FSQueue {
   private static final Log LOG = LogFactory.getLog(
       FSLeafQueue.class.getName());
-    
-  private final List<AppSchedulable> runnableAppScheds = // apps that are runnable
-      new ArrayList<AppSchedulable>();
-  private final List<AppSchedulable> nonRunnableAppScheds =
-      new ArrayList<AppSchedulable>();
+
+  private final List<FSAppAttempt> runnableApps = // apps that are runnable
+      new ArrayList<FSAppAttempt>();
+  private final List<FSAppAttempt> nonRunnableApps =
+      new ArrayList<FSAppAttempt>();
   
   private Resource demand = Resources.createResource(0);
   
@@ -55,6 +56,9 @@ public class FSLeafQueue extends FSQueue
   private long lastTimeAtMinShare;
   private long lastTimeAtHalfFairShare;
   
+  // Track the AM resource usage for this queue
+  private Resource amResourceUsage;
+
   private final ActiveUsersManager activeUsersManager;
   
   public FSLeafQueue(String name, FairScheduler scheduler,
@@ -63,31 +67,34 @@ public class FSLeafQueue extends FSQueue
     this.lastTimeAtMinShare = scheduler.getClock().getTime();
     this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
     activeUsersManager = new ActiveUsersManager(getMetrics());
+    amResourceUsage = Resource.newInstance(0, 0);
   }
   
-  public void addApp(FSSchedulerApp app, boolean runnable) {
-    AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this);
-    app.setAppSchedulable(appSchedulable);
+  public void addApp(FSAppAttempt app, boolean runnable) {
     if (runnable) {
-      runnableAppScheds.add(appSchedulable);
+      runnableApps.add(app);
     } else {
-      nonRunnableAppScheds.add(appSchedulable);
+      nonRunnableApps.add(app);
     }
   }
   
   // for testing
-  void addAppSchedulable(AppSchedulable appSched) {
-    runnableAppScheds.add(appSched);
+  void addAppSchedulable(FSAppAttempt appSched) {
+    runnableApps.add(appSched);
   }
   
   /**
    * Removes the given app from this queue.
    * @return whether or not the app was runnable
    */
-  public boolean removeApp(FSSchedulerApp app) {
-    if (runnableAppScheds.remove(app.getAppSchedulable())) {
+  public boolean removeApp(FSAppAttempt app) {
+    if (runnableApps.remove(app)) {
+      // Update AM resource usage
+      if (app.isAmRunning() && app.getAMResource() != null) {
+        Resources.subtractFrom(amResourceUsage, app.getAMResource());
+      }
       return true;
-    } else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) {
+    } else if (nonRunnableApps.remove(app)) {
       return false;
     } else {
       throw new IllegalStateException("Given app to remove " + app +
@@ -95,22 +102,22 @@ public class FSLeafQueue extends FSQueue
     }
   }
   
-  public Collection<AppSchedulable> getRunnableAppSchedulables() {
-    return runnableAppScheds;
+  public Collection<FSAppAttempt> getRunnableAppSchedulables() {
+    return runnableApps;
   }
   
-  public List<AppSchedulable> getNonRunnableAppSchedulables() {
-    return nonRunnableAppScheds;
+  public List<FSAppAttempt> getNonRunnableAppSchedulables() {
+    return nonRunnableApps;
   }
   
   @Override
   public void collectSchedulerApplications(
       Collection<ApplicationAttemptId> apps) {
-    for (AppSchedulable appSched : runnableAppScheds) {
-      apps.add(appSched.getApp().getApplicationAttemptId());
+    for (FSAppAttempt appSched : runnableApps) {
+      apps.add(appSched.getApplicationAttemptId());
     }
-    for (AppSchedulable appSched : nonRunnableAppScheds) {
-      apps.add(appSched.getApp().getApplicationAttemptId());
+    for (FSAppAttempt appSched : nonRunnableApps) {
+      apps.add(appSched.getApplicationAttemptId());
     }
   }
 
@@ -136,15 +143,19 @@ public class FSLeafQueue extends FSQueue
   @Override
   public Resource getResourceUsage() {
     Resource usage = Resources.createResource(0);
-    for (AppSchedulable app : runnableAppScheds) {
+    for (FSAppAttempt app : runnableApps) {
       Resources.addTo(usage, app.getResourceUsage());
     }
-    for (AppSchedulable app : nonRunnableAppScheds) {
+    for (FSAppAttempt app : nonRunnableApps) {
       Resources.addTo(usage, app.getResourceUsage());
     }
     return usage;
   }
 
+  public Resource getAmResourceUsage() {
+    return amResourceUsage;
+  }
+
   @Override
   public void updateDemand() {
     // Compute demand by iterating through apps in the queue
@@ -152,13 +163,13 @@ public class FSLeafQueue extends FSQueue
     Resource maxRes = scheduler.getAllocationConfiguration()
         .getMaxResources(getName());
     demand = Resources.createResource(0);
-    for (AppSchedulable sched : runnableAppScheds) {
+    for (FSAppAttempt sched : runnableApps) {
       if (Resources.equals(demand, maxRes)) {
         break;
       }
       updateDemandForApp(sched, maxRes);
     }
-    for (AppSchedulable sched : nonRunnableAppScheds) {
+    for (FSAppAttempt sched : nonRunnableApps) {
       if (Resources.equals(demand, maxRes)) {
         break;
       }
@@ -170,7 +181,7 @@ public class FSLeafQueue extends FSQueue
     }
   }
   
-  private void updateDemandForApp(AppSchedulable sched, Resource maxRes) {
+  private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) {
     sched.updateDemand();
     Resource toAdd = sched.getDemand();
     if (LOG.isDebugEnabled()) {
@@ -194,9 +205,9 @@ public class FSLeafQueue extends FSQueue
     }
 
     Comparator<Schedulable> comparator = policy.getComparator();
-    Collections.sort(runnableAppScheds, comparator);
-    for (AppSchedulable sched : runnableAppScheds) {
-      if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) {
+    Collections.sort(runnableApps, comparator);
+    for (FSAppAttempt sched : runnableApps) {
+      if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
         continue;
       }
 
@@ -209,6 +220,37 @@ public class FSLeafQueue extends FSQueue
   }
 
   @Override
+  public RMContainer preemptContainer() {
+    RMContainer toBePreempted = null;
+
+    // If this queue is not over its fair share, reject
+    if (!preemptContainerPreCheck()) {
+      return toBePreempted;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Queue " + getName() + " is going to preempt a container " +
+          "from its applications.");
+    }
+
+    // Choose the app that is most over fair share
+    Comparator<Schedulable> comparator = policy.getComparator();
+    FSAppAttempt candidateSched = null;
+    for (FSAppAttempt sched : runnableApps) {
+      if (candidateSched == null ||
+          comparator.compare(sched, candidateSched) > 0) {
+        candidateSched = sched;
+      }
+    }
+
+    // Preempt from the selected app
+    if (candidateSched != null) {
+      toBePreempted = candidateSched.preemptContainer();
+    }
+    return toBePreempted;
+  }
+
+  @Override
   public List<FSQueue> getChildQueues() {
     return new ArrayList<FSQueue>(1);
   }
@@ -247,11 +289,52 @@ public class FSLeafQueue extends FSQueue
 
   @Override
   public int getNumRunnableApps() {
-    return runnableAppScheds.size();
+    return runnableApps.size();
   }
   
   @Override
   public ActiveUsersManager getActiveUsersManager() {
     return activeUsersManager;
   }
+
+  /**
+   * Check whether this queue can run this application master under the
+   * maxAMShare limit
+   *
+   * @param amResource
+   * @return true if this queue can run
+   */
+  public boolean canRunAppAM(Resource amResource) {
+    float maxAMShare =
+        scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName());
+    if (Math.abs(maxAMShare - -1.0f) < 0.0001) {
+      return true;
+    }
+    Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
+    Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
+    return !policy
+        .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);
+  }
+
+  public void addAMResourceUsage(Resource amResource) {
+    if (amResource != null) {
+      Resources.addTo(amResourceUsage, amResource);
+    }
+  }
+
+  @Override
+  public void recoverContainer(Resource clusterResource,
+      SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+    // TODO Auto-generated method stub
+  }
+
+  /**
+   * Helper method to check if the queue should preempt containers
+   *
+   * @return true if check passes (can preempt) or false otherwise
+   */
+  private boolean preemptContainerPreCheck() {
+    return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(),
+        getFairShare());
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Tue Aug 19 23:49:39 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.re
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -32,8 +33,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 
 @Private
 @Unstable
@@ -157,6 +161,27 @@ public class FSParentQueue extends FSQue
   }
 
   @Override
+  public RMContainer preemptContainer() {
+    RMContainer toBePreempted = null;
+
+    // Find the childQueue which is most over fair share
+    FSQueue candidateQueue = null;
+    Comparator<Schedulable> comparator = policy.getComparator();
+    for (FSQueue queue : childQueues) {
+      if (candidateQueue == null ||
+          comparator.compare(queue, candidateQueue) > 0) {
+        candidateQueue = queue;
+      }
+    }
+
+    // Let the selected queue choose which of its container to preempt
+    if (candidateQueue != null) {
+      toBePreempted = candidateQueue.preemptContainer();
+    }
+    return toBePreempted;
+  }
+
+  @Override
   public List<FSQueue> getChildQueues() {
     return childQueues;
   }
@@ -200,4 +225,11 @@ public class FSParentQueue extends FSQue
     // Should never be called since all applications are submitted to LeafQueues
     return null;
   }
+
+  @Override
+  public void recoverContainer(Resource clusterResource,
+      SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+    // TODO Auto-generated method stub
+    
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Tue Aug 19 23:49:39 2014
@@ -39,7 +39,8 @@ import org.apache.hadoop.yarn.util.resou
 
 @Private
 @Unstable
-public abstract class FSQueue extends Schedulable implements Queue {
+public abstract class FSQueue implements Queue, Schedulable {
+  private Resource fairShare = Resources.createResource(0, 0);
   private final String name;
   protected final FairScheduler scheduler;
   private final FSQueueMetrics metrics;
@@ -119,9 +120,9 @@ public abstract class FSQueue extends Sc
     // TODO: we might change these queue metrics around a little bit
     // to match the semantics of the fair scheduler.
     queueInfo.setCapacity((float) getFairShare().getMemory() /
-        scheduler.getClusterCapacity().getMemory());
+        scheduler.getClusterResource().getMemory());
     queueInfo.setCapacity((float) getResourceUsage().getMemory() /
-        scheduler.getClusterCapacity().getMemory());
+        scheduler.getClusterResource().getMemory());
     
     ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>();
     if (includeChildQueues) {
@@ -139,10 +140,15 @@ public abstract class FSQueue extends Sc
   public FSQueueMetrics getMetrics() {
     return metrics;
   }
-  
+
+  /** Get the fair share assigned to this Schedulable. */
+  public Resource getFairShare() {
+    return fairShare;
+  }
+
   @Override
   public void setFairShare(Resource fairShare) {
-    super.setFairShare(fairShare);
+    this.fairShare = fairShare;
     metrics.setFairShare(fairShare);
   }
   
@@ -187,4 +193,16 @@ public abstract class FSQueue extends Sc
     }
     return true;
   }
+
+  @Override
+  public boolean isActive() {
+    return getNumRunnableApps() > 0;
+  }
+
+  /** Convenient toString implementation for debugging. */
+  @Override
+  public String toString() {
+    return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
+        getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Tue Aug 19 23:49:39 2014
@@ -18,28 +18,16 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.util.resource.Resources;
 
 @Private
 @Unstable
@@ -47,208 +35,56 @@ public class FSSchedulerNode extends Sch
 
   private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);
 
-  private static final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
-
-  private Resource availableResource;
-  private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
-  private Resource totalResourceCapability;
-
-  private volatile int numContainers;
-
-  private RMContainer reservedContainer;
-  private AppSchedulable reservedAppSchedulable;
-  
-  /* set of containers that are allocated containers */
-  private final Map<ContainerId, RMContainer> launchedContainers = 
-    new HashMap<ContainerId, RMContainer>();
-  
-  private final RMNode rmNode;
-  private final String nodeName;
+  private FSAppAttempt reservedAppSchedulable;
 
   public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
-    this.rmNode = node;
-    this.availableResource = Resources.clone(node.getTotalCapability());
-    totalResourceCapability =
-        Resource.newInstance(node.getTotalCapability().getMemory(), node
-            .getTotalCapability().getVirtualCores());
-    if (usePortForNodeName) {
-      nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
-    } else {
-      nodeName = rmNode.getHostName();
-    }
-  }
-
-  public RMNode getRMNode() {
-    return rmNode;
-  }
-
-  public NodeId getNodeID() {
-    return rmNode.getNodeID();
-  }
-
-  public String getHttpAddress() {
-    return rmNode.getHttpAddress();
-  }
-
-  @Override
-  public String getNodeName() {
-    return nodeName;
+    super(node, usePortForNodeName);
   }
 
   @Override
-  public String getRackName() {
-    return rmNode.getRackName();
-  }
-
-  /**
-   * The Scheduler has allocated containers on this node to the 
-   * given application.
-   * 
-   * @param applicationId application
-   * @param rmContainer allocated container
-   */
-  public synchronized void allocateContainer(ApplicationId applicationId, 
-      RMContainer rmContainer) {
-    Container container = rmContainer.getContainer();
-    deductAvailableResource(container.getResource());
-    ++numContainers;
-    
-    launchedContainers.put(container.getId(), rmContainer);
-
-    LOG.info("Assigned container " + container.getId() + 
-        " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + 
-        ", which currently has " + numContainers + " containers, " + 
-        getUsedResource() + " used and " + 
-        getAvailableResource() + " available");
-  }
-
-  @Override
-  public synchronized Resource getAvailableResource() {
-    return availableResource;
-  }
-
-  @Override
-  public synchronized Resource getUsedResource() {
-    return usedResource;
-  }
-
-  private synchronized boolean isValidContainer(Container c) {    
-    if (launchedContainers.containsKey(c.getId())) {
-      return true;
-    }
-    return false;
-  }
-
-  private synchronized void updateResource(Container container) {
-    addAvailableResource(container.getResource());
-    --numContainers;
-  }
-  
-  /**
-   * Release an allocated container on this node.
-   * @param container container to be released
-   */
-  public synchronized void releaseContainer(Container container) {
-    if (!isValidContainer(container)) {
-      LOG.error("Invalid container released " + container);
-      return;
-    }
-
-    /* remove the containers from the nodemanger */
-    launchedContainers.remove(container.getId());
-    updateResource(container);
-
-    LOG.info("Released container " + container.getId() + 
-        " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + 
-        ", which currently has " + numContainers + " containers, " + 
-        getUsedResource() + " used and " + getAvailableResource()
-        + " available" + ", release resources=" + true);
-  }
-
-
-  private synchronized void addAvailableResource(Resource resource) {
-    if (resource == null) {
-      LOG.error("Invalid resource addition of null resource for "
-          + rmNode.getNodeAddress());
-      return;
-    }
-    Resources.addTo(availableResource, resource);
-    Resources.subtractFrom(usedResource, resource);
-  }
-
-  @Override
-  public Resource getTotalResource() {
-    return this.totalResourceCapability;
-  }
-
-  private synchronized void deductAvailableResource(Resource resource) {
-    if (resource == null) {
-      LOG.error("Invalid deduction of null resource for "
-          + rmNode.getNodeAddress());
-      return;
-    }
-    Resources.subtractFrom(availableResource, resource);
-    Resources.addTo(usedResource, resource);
-  }
-
-  @Override
-  public String toString() {
-    return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +  
-      " available=" + getAvailableResource() + 
-      " used=" + getUsedResource();
-  }
-
-  @Override
-  public int getNumContainers() {
-    return numContainers;
-  }
-
-  public synchronized List<RMContainer> getRunningContainers() {
-    return new ArrayList<RMContainer>(launchedContainers.values());
-  }
-
   public synchronized void reserveResource(
-      FSSchedulerApp application, Priority priority, 
-      RMContainer reservedContainer) {
+      SchedulerApplicationAttempt application, Priority priority,
+      RMContainer container) {
     // Check if it's already reserved
-    if (this.reservedContainer != null) {
+    RMContainer reservedContainer = getReservedContainer();
+    if (reservedContainer != null) {
       // Sanity check
-      if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) {
+      if (!container.getContainer().getNodeId().equals(getNodeID())) {
         throw new IllegalStateException("Trying to reserve" +
-            " container " + reservedContainer +
-            " on node " + reservedContainer.getReservedNode() + 
-            " when currently" + " reserved resource " + this.reservedContainer +
-            " on node " + this.reservedContainer.getReservedNode());
+            " container " + container +
+            " on node " + container.getReservedNode() + 
+            " when currently" + " reserved resource " + reservedContainer +
+            " on node " + reservedContainer.getReservedNode());
       }
       
       // Cannot reserve more than one application on a given node!
-      if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
-          reservedContainer.getContainer().getId().getApplicationAttemptId())) {
+      if (!reservedContainer.getContainer().getId().getApplicationAttemptId()
+          .equals(container.getContainer().getId().getApplicationAttemptId())) {
         throw new IllegalStateException("Trying to reserve" +
-        		" container " + reservedContainer + 
+            " container " + container + 
             " for application " + application.getApplicationId() + 
             " when currently" +
-            " reserved container " + this.reservedContainer +
+            " reserved container " + reservedContainer +
             " on node " + this);
       }
 
       LOG.info("Updated reserved container " + 
-          reservedContainer.getContainer().getId() + " on node " + 
+          container.getContainer().getId() + " on node " + 
           this + " for application " + application);
     } else {
-      LOG.info("Reserved container " + reservedContainer.getContainer().getId() + 
+      LOG.info("Reserved container " + container.getContainer().getId() + 
           " on node " + this + " for application " + application);
     }
-    this.reservedContainer = reservedContainer;
-    this.reservedAppSchedulable = application.getAppSchedulable();
+    setReservedContainer(container);
+    this.reservedAppSchedulable = (FSAppAttempt) application;
   }
 
+  @Override
   public synchronized void unreserveResource(
-      FSSchedulerApp application) {
+      SchedulerApplicationAttempt application) {
     // Cannot unreserve for wrong application...
     ApplicationAttemptId reservedApplication = 
-        reservedContainer.getContainer().getId().getApplicationAttemptId(); 
+        getReservedContainer().getContainer().getId().getApplicationAttemptId(); 
     if (!reservedApplication.equals(
         application.getApplicationAttemptId())) {
       throw new IllegalStateException("Trying to unreserve " +  
@@ -258,22 +94,11 @@ public class FSSchedulerNode extends Sch
           " on node " + this);
     }
     
-    this.reservedContainer = null;
+    setReservedContainer(null);
     this.reservedAppSchedulable = null;
   }
 
-  public synchronized RMContainer getReservedContainer() {
-    return reservedContainer;
-  }
-
-  public synchronized AppSchedulable getReservedAppSchedulable() {
+  public synchronized FSAppAttempt getReservedAppSchedulable() {
     return reservedAppSchedulable;
   }
-  
-  @Override
-  public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
-    // we can only adjust available resource if total resource is changed.
-    Resources.addTo(this.availableResource, deltaResource);
-  }
-  
 }