You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/11/07 19:44:03 UTC

hadoop git commit: YARN-2824. Fixed Capacity Scheduler to not crash when some node-labels are not mapped to queues by making default capacities per label to be zero. Contributed by Wangda Tan.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 1e97f2f09 -> 2ac1be7de


YARN-2824. Fixed Capacity Scheduler to not crash when some node-labels are not mapped to queues by making default capacities per label to be zero. Contributed by Wangda Tan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2ac1be7d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2ac1be7d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2ac1be7d

Branch: refs/heads/trunk
Commit: 2ac1be7dec4aef001e3162e364249933b2c4a6c4
Parents: 1e97f2f
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Fri Nov 7 10:39:37 2014 -0800
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Fri Nov 7 10:39:37 2014 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  4 +
 .../dev-support/findbugs-exclude.xml            |  1 +
 .../distributedshell/TestDistributedShell.java  |  2 +
 .../scheduler/capacity/AbstractCSQueue.java     | 28 ++++---
 .../CapacitySchedulerConfiguration.java         | 17 +---
 .../capacity/TestContainerAllocation.java       |  5 ++
 .../scheduler/capacity/TestQueueParsing.java    | 84 ++++++++++++++++++--
 7 files changed, 107 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ac1be7d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 111aaaa..c48fb4f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -894,6 +894,10 @@ Release 2.6.0 - UNRELEASED
 
     YARN-2810. TestRMProxyUsersConf fails on Windows VMs. (Varun Vasudev via xgong)
 
+    YARN-2824. Fixed Capacity Scheduler to not crash when some node-labels are
+    not mapped to queues by making default capacities per label to be zero.
+    (Wangda Tan via vinodkv)
+
 Release 2.5.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ac1be7d/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index e6da24c..45d7294 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -202,6 +202,7 @@
       <Field name="accessibleLabels" />
       <Field name="absoluteNodeLabelCapacities" />
       <Field name="reservationsContinueLooking" />
+      <Field name="absoluteCapacityByNodeLabels" />
     </Or>
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ac1be7d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 0ded5bd..eb0fb94 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -86,6 +86,8 @@ public class TestDistributedShell {
 
     // Setup queue access to node labels
     conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
+    conf.set("yarn.scheduler.capacity.root.accessible-node-labels.x.capacity",
+        "100");
     conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
     conf.set(
         "yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ac1be7d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index c612846..fc0fbb4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -395,16 +395,15 @@ public abstract class AbstractCSQueue implements CSQueue {
   
   @Private
   public float getCapacityByNodeLabel(String label) {
-    if (null == parent) {
-      return 1f;
-    }
-    
     if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
+      if (null == parent) {
+        return 1f;
+      }
       return getCapacity();
     }
     
     if (!capacitiyByNodeLabels.containsKey(label)) {
-      return 0;
+      return 0f;
     } else {
       return capacitiyByNodeLabels.get(label);
     }
@@ -412,18 +411,17 @@ public abstract class AbstractCSQueue implements CSQueue {
   
   @Private
   public float getAbsoluteCapacityByNodeLabel(String label) {
-    if (null == parent) {
-      return 1; 
-    }
-    
     if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
+      if (null == parent) {
+        return 1f; 
+      }
       return getAbsoluteCapacity();
     }
     
-    if (!absoluteMaxCapacityByNodeLabels.containsKey(label)) {
-      return 0;
+    if (!absoluteCapacityByNodeLabels.containsKey(label)) {
+      return 0f;
     } else {
-      return absoluteMaxCapacityByNodeLabels.get(label);
+      return absoluteCapacityByNodeLabels.get(label);
     }
   }
   
@@ -433,7 +431,11 @@ public abstract class AbstractCSQueue implements CSQueue {
       return getAbsoluteMaximumCapacity();
     }
     
-    return getAbsoluteCapacityByNodeLabel(label);
+    if (!absoluteMaxCapacityByNodeLabels.containsKey(label)) {
+      return 0f;
+    } else {
+      return absoluteMaxCapacityByNodeLabels.get(label);
+    }
   }
   
   @Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ac1be7d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index b36172c..23bf381 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -461,19 +461,8 @@ public class CapacitySchedulerConfiguration extends Configuration {
 
     for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr
         .getClusterNodeLabels() : labels) {
-      // capacity of all labels in each queue should be 1
-      if (org.apache.commons.lang.StringUtils.equals(ROOT, queue)) {
-        nodeLabelCapacities.put(label, 1.0f);
-        continue;
-      }
       String capacityPropertyName = getNodeLabelPrefix(queue, label) + CAPACITY;
-      float capacity = getFloat(capacityPropertyName, UNDEFINED);
-      if (capacity == UNDEFINED) {
-        throw new IllegalArgumentException("Configuration issue: "
-            + " node-label=" + label + " is accessible from queue=" + queue
-            + " but has no capacity set, you should set " 
-            + capacityPropertyName + " in range of [0, 100].");
-      }
+      float capacity = getFloat(capacityPropertyName, 0f);
       if (capacity < MINIMUM_CAPACITY_VALUE
           || capacity > MAXIMUM_CAPACITY_VALUE) {
         throw new IllegalArgumentException("Illegal capacity of " + capacity
@@ -501,9 +490,7 @@ public class CapacitySchedulerConfiguration extends Configuration {
         .getClusterNodeLabels() : labels) {
       float maxCapacity =
           getFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY,
-              UNDEFINED);
-      maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE) ? 
-          MAXIMUM_CAPACITY_VALUE : maxCapacity;
+              100f);
       if (maxCapacity < MINIMUM_CAPACITY_VALUE
           || maxCapacity > MAXIMUM_CAPACITY_VALUE) {
         throw new IllegalArgumentException("Illegal " + "capacity of "

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ac1be7d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index b90df8e..0c32c0c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -340,6 +340,8 @@ public class TestContainerAllocation {
     
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
 
     final String A = CapacitySchedulerConfiguration.ROOT + ".a";
     conf.setCapacity(A, 10);
@@ -403,6 +405,9 @@ public class TestContainerAllocation {
     
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
 
     final String A = CapacitySchedulerConfiguration.ROOT + ".a";
     conf.setCapacity(A, 10);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ac1be7d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
index 42db030..cf2e5ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
@@ -18,10 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
@@ -51,8 +47,9 @@ public class TestQueueParsing {
   
   @Before
   public void setup() {
-    nodeLabelManager = mock(RMNodeLabelsManager.class);
-    when(nodeLabelManager.containsNodeLabel(any(String.class))).thenReturn(true);
+    nodeLabelManager = new MemoryRMNodeLabelsManager();
+    nodeLabelManager.init(new YarnConfiguration());
+    nodeLabelManager.start();
   }
   
   @Test
@@ -255,6 +252,8 @@ public class TestQueueParsing {
   private void setupQueueConfigurationWithLabels(CapacitySchedulerConfiguration conf) {
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "red", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "blue", 100);
 
     final String A = CapacitySchedulerConfiguration.ROOT + ".a";
     conf.setCapacity(A, 10);
@@ -271,6 +270,7 @@ public class TestQueueParsing {
     conf.setQueues(A, new String[] {"a1", "a2"});
     conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
     conf.setCapacityByLabel(A, "red", 50);
+    conf.setMaximumCapacityByLabel(A, "red", 50);
     conf.setCapacityByLabel(A, "blue", 50);
     
     conf.setCapacity(A1, 30);
@@ -282,6 +282,7 @@ public class TestQueueParsing {
     conf.setMaximumCapacity(A2, 85);
     conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
     conf.setCapacityByLabel(A2, "red", 50);
+    conf.setMaximumCapacityByLabel(A2, "red", 60);
     
     final String B1 = B + ".b1";
     final String B2 = B + ".b2";
@@ -311,6 +312,8 @@ public class TestQueueParsing {
       CapacitySchedulerConfiguration conf) {
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "red", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "blue", 100);
 
     // Set A configuration
     final String A = CapacitySchedulerConfiguration.ROOT + ".a";
@@ -364,6 +367,7 @@ public class TestQueueParsing {
   
   @Test
   public void testQueueParsingReinitializeWithLabels() throws IOException {
+    nodeLabelManager.addToCluserNodeLabels(ImmutableSet.of("red", "blue"));
     CapacitySchedulerConfiguration csConf =
         new CapacitySchedulerConfiguration();
     setupQueueConfigurationWithoutLabels(csConf);
@@ -410,6 +414,22 @@ public class TestQueueParsing {
     // queue-B2 inherits "red"/"blue"
     Assert.assertTrue(capacityScheduler.getQueue("b2")
         .getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
+    
+    // check capacity of A2
+    CSQueue qA2 = capacityScheduler.getQueue("a2");
+    Assert.assertEquals(0.7, qA2.getCapacity(), DELTA);
+    Assert.assertEquals(0.5, qA2.getCapacityByNodeLabel("red"), DELTA);
+    Assert.assertEquals(0.07, qA2.getAbsoluteCapacity(), DELTA);
+    Assert.assertEquals(0.25, qA2.getAbsoluteCapacityByNodeLabel("red"), DELTA);
+    Assert.assertEquals(0.1275, qA2.getAbsoluteMaximumCapacity(), DELTA);
+    Assert.assertEquals(0.3, qA2.getAbsoluteMaximumCapacityByNodeLabel("red"), DELTA);
+    
+    // check capacity of B3
+    CSQueue qB3 = capacityScheduler.getQueue("b3");
+    Assert.assertEquals(0.18, qB3.getAbsoluteCapacity(), DELTA);
+    Assert.assertEquals(0.125, qB3.getAbsoluteCapacityByNodeLabel("red"), DELTA);
+    Assert.assertEquals(0.35, qB3.getAbsoluteMaximumCapacity(), DELTA);
+    Assert.assertEquals(1, qB3.getAbsoluteMaximumCapacityByNodeLabel("red"), DELTA);
   }
   
   private void
@@ -435,6 +455,8 @@ public class TestQueueParsing {
   
   @Test
   public void testQueueParsingWithLabels() throws IOException {
+    nodeLabelManager.addToCluserNodeLabels(ImmutableSet.of("red", "blue"));
+    
     YarnConfiguration conf = new YarnConfiguration();
     CapacitySchedulerConfiguration csConf =
         new CapacitySchedulerConfiguration(conf);
@@ -457,6 +479,8 @@ public class TestQueueParsing {
   
   @Test
   public void testQueueParsingWithLabelsInherit() throws IOException {
+    nodeLabelManager.addToCluserNodeLabels(ImmutableSet.of("red", "blue"));
+
     YarnConfiguration conf = new YarnConfiguration();
     CapacitySchedulerConfiguration csConf =
         new CapacitySchedulerConfiguration(conf);
@@ -587,4 +611,52 @@ public class TestQueueParsing {
     ServiceOperations.stopQuietly(capacityScheduler);
     ServiceOperations.stopQuietly(nodeLabelsManager);
   }
+  
+  @Test
+  public void testQueueParsingWithUnusedLabels() throws IOException {
+    final ImmutableSet<String> labels = ImmutableSet.of("red", "blue");
+    
+    // Initialize a cluster with labels, but doesn't use them, reinitialize
+    // shouldn't fail
+    nodeLabelManager.addToCluserNodeLabels(labels);
+
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    csConf.setAccessibleNodeLabels(CapacitySchedulerConfiguration.ROOT, labels);
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+
+    CapacityScheduler capacityScheduler = new CapacityScheduler();
+    capacityScheduler.setConf(conf);
+    RMContextImpl rmContext =
+        new RMContextImpl(null, null, null, null, null, null,
+            new RMContainerTokenSecretManager(csConf),
+            new NMTokenSecretManagerInRM(csConf),
+            new ClientToAMTokenSecretManagerInRM(), null);
+    rmContext.setNodeLabelManager(nodeLabelManager);
+    capacityScheduler.setRMContext(rmContext);
+    capacityScheduler.init(conf);
+    capacityScheduler.start();
+    capacityScheduler.reinitialize(conf, rmContext);
+    
+    // check root queue's capacity by label -- they should be all zero
+    CSQueue root = capacityScheduler.getQueue(CapacitySchedulerConfiguration.ROOT);
+    Assert.assertEquals(0, root.getCapacityByNodeLabel("red"), DELTA);
+    Assert.assertEquals(0, root.getCapacityByNodeLabel("blue"), DELTA);
+
+    CSQueue a = capacityScheduler.getQueue("a");
+    Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
+    Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
+
+    CSQueue b1 = capacityScheduler.getQueue("b1");
+    Assert.assertEquals(0.2 * 0.5, b1.getAbsoluteCapacity(), DELTA);
+    Assert.assertEquals("Parent B has no MAX_CAP", 0.85,
+        b1.getAbsoluteMaximumCapacity(), DELTA);
+
+    CSQueue c12 = capacityScheduler.getQueue("c12");
+    Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
+    Assert.assertEquals(0.7 * 0.55 * 0.7, c12.getAbsoluteMaximumCapacity(),
+        DELTA);
+    capacityScheduler.stop();
+  }
 }