You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by tg...@apache.org on 2012/10/24 16:07:55 UTC

svn commit: r1401699 - in /hadoop/common/branches/branch-0.23/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ hadoop-yarn/hadoop-y...

Author: tgraves
Date: Wed Oct 24 14:07:54 2012
New Revision: 1401699

URL: http://svn.apache.org/viewvc?rev=1401699&view=rev
Log:
YARN-177. CapacityScheduler - adding a queue while the RM is running has wacky results (acmurthy vai tgraves)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt?rev=1401699&r1=1401698&r2=1401699&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt Wed Oct 24 14:07:54 2012
@@ -45,6 +45,9 @@ Release 0.23.5 - UNRELEASED
     YARN-174. Modify NodeManager to pass the user's configuration even when
     rebooting. (vinodkv)
 
+    YARN-177. CapacityScheduler - adding a queue while the RM is running has 
+    wacky results (acmurthy vai tgraves)
+
 Release 0.23.4 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1401699&r1=1401698&r2=1401699&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Wed Oct 24 14:07:54 2012
@@ -51,6 +51,12 @@ extends org.apache.hadoop.yarn.server.re
   public CSQueue getParent();
 
   /**
+   * Set the parent <code>Queue</code>.
+   * @param newParentQueue new parent queue
+   */
+  public void setParent(CSQueue newParentQueue);
+
+  /**
    * Get the queue name.
    * @return the queue name
    */
@@ -195,10 +201,10 @@ extends org.apache.hadoop.yarn.server.re
   
   /**
    * Reinitialize the queue.
-   * @param queue new queue to re-initalize from
+   * @param newlyParsedQueue new queue to re-initalize from
    * @param clusterResource resources in the cluster
    */
-  public void reinitialize(CSQueue queue, Resource clusterResource) 
+  public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) 
   throws IOException;
 
    /**

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1401699&r1=1401698&r2=1401699&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Oct 24 14:07:54 2012
@@ -218,7 +218,7 @@ public class LeafQueue implements CSQueu
   {
     // Sanity check
     CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
-    float absCapacity = parent.getAbsoluteCapacity() * capacity;
+    float absCapacity = getParent().getAbsoluteCapacity() * capacity;
     CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absCapacity, absoluteMaxCapacity);
 
     this.capacity = capacity; 
@@ -251,7 +251,7 @@ public class LeafQueue implements CSQueu
     
     // Update metrics
     CSQueueUtils.updateQueueStatistics(
-        this, parent, clusterResource, minimumAllocation);
+        this, getParent(), clusterResource, minimumAllocation);
 
     LOG.info("Initializing " + queueName + "\n" +
         "capacity = " + capacity +
@@ -334,10 +334,15 @@ public class LeafQueue implements CSQueu
   }
 
   @Override
-  public CSQueue getParent() {
+  public synchronized CSQueue getParent() {
     return parent;
   }
-
+  
+  @Override
+  public synchronized void setParent(CSQueue newParentQueue) {
+    this.parent = (ParentQueue)newParentQueue;
+  }
+  
   @Override
   public String getQueueName() {
     return queueName;
@@ -345,7 +350,7 @@ public class LeafQueue implements CSQueu
 
   @Override
   public String getQueuePath() {
-    return parent.getQueuePath() + "." + getQueueName();
+    return getParent().getQueuePath() + "." + getQueueName();
   }
 
   /**
@@ -425,7 +430,9 @@ public class LeafQueue implements CSQueu
   synchronized void setMaxCapacity(float maximumCapacity) {
     // Sanity check
     CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
-    float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
+    float absMaxCapacity = 
+        CSQueueUtils.computeAbsoluteMaximumCapacity(
+            maximumCapacity, getParent());
     CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity);
     
     this.maximumCapacity = maximumCapacity;
@@ -448,10 +455,6 @@ public class LeafQueue implements CSQueu
     this.userLimitFactor = userLimitFactor;
   }
 
-  synchronized void setParentQueue(CSQueue parent) {
-    this.parent = parent;
-  }
-  
   @Override
   public synchronized int getNumApplications() {
     return getNumPendingApplications() + getNumActiveApplications();
@@ -549,26 +552,28 @@ public class LeafQueue implements CSQueu
   }
 
   @Override
-  public synchronized void reinitialize(CSQueue queue, Resource clusterResource) 
+  public synchronized void reinitialize(
+      CSQueue newlyParsedQueue, Resource clusterResource) 
   throws IOException {
     // Sanity check
-    if (!(queue instanceof LeafQueue) || 
-        !queue.getQueuePath().equals(getQueuePath())) {
+    if (!(newlyParsedQueue instanceof LeafQueue) || 
+        !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
       throw new IOException("Trying to reinitialize " + getQueuePath() + 
-          " from " + queue.getQueuePath());
+          " from " + newlyParsedQueue.getQueuePath());
     }
 
-    LeafQueue leafQueue = (LeafQueue)queue;
+    LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue;
     setupQueueConfigs(
         clusterResource,
-        leafQueue.capacity, leafQueue.absoluteCapacity, 
-        leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity, 
-        leafQueue.userLimit, leafQueue.userLimitFactor, 
-        leafQueue.maxApplications,
-        leafQueue.getMaxApplicationsPerUser(),
-        leafQueue.getMaximumActiveApplications(), 
-        leafQueue.getMaximumActiveApplicationsPerUser(),
-        leafQueue.state, leafQueue.acls);
+        newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity, 
+        newlyParsedLeafQueue.maximumCapacity, 
+        newlyParsedLeafQueue.absoluteMaxCapacity, 
+        newlyParsedLeafQueue.userLimit, newlyParsedLeafQueue.userLimitFactor, 
+        newlyParsedLeafQueue.maxApplications,
+        newlyParsedLeafQueue.getMaxApplicationsPerUser(),
+        newlyParsedLeafQueue.getMaximumActiveApplications(), 
+        newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
+        newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls);
   }
 
   @Override
@@ -581,7 +586,7 @@ public class LeafQueue implements CSQueu
     }
 
     // Check if parent-queue allows access
-    return parent.hasAccess(acl, user);
+    return getParent().hasAccess(acl, user);
   }
 
   @Override
@@ -639,10 +644,10 @@ public class LeafQueue implements CSQueu
 
     // Inform the parent queue
     try {
-      parent.submitApplication(application, userName, queue);
+      getParent().submitApplication(application, userName, queue);
     } catch (AccessControlException ace) {
       LOG.info("Failed to submit application to parent-queue: " + 
-          parent.getQueuePath(), ace);
+          getParent().getQueuePath(), ace);
       removeApplication(application, user);
       throw ace;
     }
@@ -698,7 +703,7 @@ public class LeafQueue implements CSQueu
     }
 
     // Inform the parent queue
-    parent.finishApplication(application, queue);
+    getParent().finishApplication(application, queue);
   }
 
   public synchronized void removeApplication(SchedulerApp application, User user) {
@@ -1337,7 +1342,7 @@ public class LeafQueue implements CSQueu
       }
 
       // Inform the parent queue
-      parent.completedContainer(clusterResource, application, 
+      getParent().completedContainer(clusterResource, application, 
           node, rmContainer, null, event);
     }
   }
@@ -1347,7 +1352,7 @@ public class LeafQueue implements CSQueu
     // Update queue metrics
     Resources.addTo(usedResources, resource);
     CSQueueUtils.updateQueueStatistics(
-        this, parent, clusterResource, minimumAllocation);
+        this, getParent(), clusterResource, minimumAllocation);
     ++numContainers;
 
     // Update user metrics
@@ -1372,7 +1377,7 @@ public class LeafQueue implements CSQueu
     // Update queue metrics
     Resources.subtractFrom(usedResources, resource);
     CSQueueUtils.updateQueueStatistics(
-        this, parent, clusterResource, minimumAllocation);
+        this, getParent(), clusterResource, minimumAllocation);
     --numContainers;
 
     // Update user metrics
@@ -1403,7 +1408,7 @@ public class LeafQueue implements CSQueu
     
     // Update metrics
     CSQueueUtils.updateQueueStatistics(
-        this, parent, clusterResource, minimumAllocation);
+        this, getParent(), clusterResource, minimumAllocation);
     
     // Update application properties
     for (SchedulerApp application : activeApplications) {
@@ -1474,7 +1479,7 @@ public class LeafQueue implements CSQueu
     synchronized (this) {
       allocateResource(clusterResource, application, container.getResource());
     }
-    parent.recoverContainer(clusterResource, application, container);
+    getParent().recoverContainer(clusterResource, application, container);
 
   }
   

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1401699&r1=1401698&r2=1401699&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Wed Oct 24 14:07:54 2012
@@ -60,7 +60,7 @@ public class ParentQueue implements CSQu
 
   private static final Log LOG = LogFactory.getLog(ParentQueue.class);
 
-  private final CSQueue parent;
+  private CSQueue parent;
   private final String queueName;
   
   private float capacity;
@@ -216,11 +216,16 @@ public class ParentQueue implements CSQu
   }
   
   @Override
-  public CSQueue getParent() {
+  public synchronized CSQueue getParent() {
     return parent;
   }
 
   @Override
+  public synchronized void setParent(CSQueue newParentQueue) {
+    this.parent = (ParentQueue)newParentQueue;
+  }
+  
+  @Override
   public String getQueueName() {
     return queueName;
   }
@@ -357,37 +362,52 @@ public class ParentQueue implements CSQu
   }
   
   @Override
-  public synchronized void reinitialize(CSQueue queue, Resource clusterResource)
+  public synchronized void reinitialize(
+      CSQueue newlyParsedQueue, Resource clusterResource)
   throws IOException {
     // Sanity check
-    if (!(queue instanceof ParentQueue) ||
-        !queue.getQueuePath().equals(getQueuePath())) {
+    if (!(newlyParsedQueue instanceof ParentQueue) ||
+        !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
       throw new IOException("Trying to reinitialize " + getQueuePath() +
-          " from " + queue.getQueuePath());
+          " from " + newlyParsedQueue.getQueuePath());
     }
 
-    ParentQueue parentQueue = (ParentQueue)queue;
+    ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue;
 
     // Set new configs
     setupQueueConfigs(clusterResource,
-        parentQueue.capacity, parentQueue.absoluteCapacity,
-        parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity,
-        parentQueue.state, parentQueue.acls);
+        newlyParsedParentQueue.capacity, 
+        newlyParsedParentQueue.absoluteCapacity,
+        newlyParsedParentQueue.maximumCapacity, 
+        newlyParsedParentQueue.absoluteMaxCapacity,
+        newlyParsedParentQueue.state, 
+        newlyParsedParentQueue.acls);
 
     // Re-configure existing child queues and add new ones
     // The CS has already checked to ensure all existing child queues are present!
     Map<String, CSQueue> currentChildQueues = getQueues(childQueues);
-    Map<String, CSQueue> newChildQueues = getQueues(parentQueue.childQueues);
+    Map<String, CSQueue> newChildQueues = 
+        getQueues(newlyParsedParentQueue.childQueues);
     for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
       String newChildQueueName = e.getKey();
       CSQueue newChildQueue = e.getValue();
 
       CSQueue childQueue = currentChildQueues.get(newChildQueueName);
-      if (childQueue != null){
+      
+      // Check if the child-queue already exists
+      if (childQueue != null) {
+        // Re-init existing child queues
         childQueue.reinitialize(newChildQueue, clusterResource);
         LOG.info(getQueueName() + ": re-configured queue: " + childQueue);
       } else {
+        // New child queue, do not re-init
+        
+        // Set parent to 'this'
+        newChildQueue.setParent(this);
+        
+        // Save in list of current child queues
         currentChildQueues.put(newChildQueueName, newChildQueue);
+        
         LOG.info(getQueueName() + ": added new child queue: " + newChildQueue);
       }
     }

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1401699&r1=1401698&r2=1401699&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Wed Oct 24 14:07:54 2012
@@ -374,4 +374,42 @@ public class TestCapacityScheduler {
 
     Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory());
   }
+
+  @Test
+  public void testRefreshQueuesWithNewQueue() throws Exception {
+    CapacityScheduler cs = new CapacityScheduler();
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    cs.setConf(new YarnConfiguration());
+    cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
+      new RMContainerTokenSecretManager(conf)));
+    checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+    // Add a new queue b4
+    String B4 = B + ".b4";
+    float B4_CAPACITY = 10;
+    
+    B3_CAPACITY -= B4_CAPACITY;
+    try {
+      conf.setCapacity(A, 80f);
+      conf.setCapacity(B, 20f);
+      conf.setQueues(B, new String[] {"b1", "b2", "b3", "b4"});
+      conf.setCapacity(B1, B1_CAPACITY);
+      conf.setCapacity(B2, B2_CAPACITY);
+      conf.setCapacity(B3, B3_CAPACITY);
+      conf.setCapacity(B4, B4_CAPACITY);
+      cs.reinitialize(conf,null);
+      checkQueueCapacities(cs, 80f, 20f);
+      
+      // Verify parent for B4
+      CSQueue rootQueue = cs.getRootQueue();
+      CSQueue queueB = findQueue(rootQueue, B);
+      CSQueue queueB4 = findQueue(queueB, B4);
+
+      assertEquals(queueB, queueB4.getParent());
+    } finally {
+      B3_CAPACITY += B4_CAPACITY;
+    }
+  }
+
 }