You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/11/09 17:54:09 UTC

[07/30] hadoop git commit: YARN-2744. Fixed CapacityScheduler to validate node-labels correctly against queues. Contributed by Wangda Tan.

YARN-2744. Fixed CapacityScheduler to validate node-labels correctly against queues. Contributed by Wangda Tan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a3839a9f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a3839a9f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a3839a9f

Branch: refs/heads/HDFS-EC
Commit: a3839a9fbfb8eec396b9bf85472d25e0ffc3aab2
Parents: 16b3482
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Thu Nov 6 17:28:12 2014 -0800
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Nov 6 17:28:12 2014 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/capacity/AbstractCSQueue.java     |   4 +-
 .../scheduler/capacity/TestQueueParsing.java    | 144 ++++++++++++++++++-
 3 files changed, 144 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3839a9f/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0c5fc4c..f6b39e3 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -882,6 +882,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2812. TestApplicationHistoryServer is likely to fail on less powerful machine.
     (Zhijie Shen via xgong)
 
+    YARN-2744. Fixed CapacityScheduler to validate node-labels correctly against
+    queues. (Wangda Tan via vinodkv)
+
 Release 2.5.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3839a9f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 7159e4d..c612846 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -105,9 +105,9 @@ public abstract class AbstractCSQueue implements CSQueue {
     // inherit from parent if labels not set
     if (this.accessibleLabels == null && parent != null) {
       this.accessibleLabels = parent.getAccessibleNodeLabels();
-      SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager,
-          this.accessibleLabels);
     }
+    SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager,
+        this.accessibleLabels);
     
     // inherit from parent if labels not set
     if (this.defaultLabelExpression == null && parent != null

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3839a9f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
index 20a7e53..42db030 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
@@ -26,9 +26,11 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.ServiceOperations;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@@ -80,7 +82,7 @@ public class TestQueueParsing {
     Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
     Assert.assertEquals(0.7 * 0.55 * 0.7, 
         c12.getAbsoluteMaximumCapacity(), DELTA);
-    capacityScheduler.stop();
+    ServiceOperations.stopQuietly(capacityScheduler);
   }
   
   private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
@@ -164,7 +166,7 @@ public class TestQueueParsing {
     capacityScheduler.init(conf);
     capacityScheduler.start();
     capacityScheduler.reinitialize(conf, null);
-    capacityScheduler.stop();
+    ServiceOperations.stopQuietly(capacityScheduler);
   }
   
   public void testMaxCapacity() throws Exception {
@@ -339,6 +341,27 @@ public class TestQueueParsing {
     conf.setCapacityByLabel(A2, "red", 50);
   }
   
+  private void setupQueueConfigurationWithSingleLevel(
+      CapacitySchedulerConfiguration conf) {
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+
+    // Set A configuration
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    conf.setCapacity(A, 10);
+    conf.setMaximumCapacity(A, 15);
+    conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
+    conf.setCapacityByLabel(A, "red", 90);
+    conf.setCapacityByLabel(A, "blue", 90);
+    
+    // Set B configuraiton
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    conf.setCapacity(B, 90);
+    conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue"));
+    conf.setCapacityByLabel(B, "red", 10);
+    conf.setCapacityByLabel(B, "blue", 10);
+  }
+  
   @Test
   public void testQueueParsingReinitializeWithLabels() throws IOException {
     CapacitySchedulerConfiguration csConf =
@@ -362,7 +385,7 @@ public class TestQueueParsing {
     conf = new YarnConfiguration(csConf);
     capacityScheduler.reinitialize(conf, rmContext);
     checkQueueLabels(capacityScheduler);
-    capacityScheduler.stop();
+    ServiceOperations.stopQuietly(capacityScheduler);
   }
   
   private void checkQueueLabels(CapacityScheduler capacityScheduler) {
@@ -429,7 +452,7 @@ public class TestQueueParsing {
     capacityScheduler.init(csConf);
     capacityScheduler.start();
     checkQueueLabels(capacityScheduler);
-    capacityScheduler.stop();
+    ServiceOperations.stopQuietly(capacityScheduler);
   }
   
   @Test
@@ -451,6 +474,117 @@ public class TestQueueParsing {
     capacityScheduler.init(csConf);
     capacityScheduler.start();
     checkQueueLabelsInheritConfig(capacityScheduler);
-    capacityScheduler.stop();
+    ServiceOperations.stopQuietly(capacityScheduler);
+  }
+  
+  @Test(expected = Exception.class)
+  public void testQueueParsingWhenLabelsNotExistedInNodeLabelManager()
+      throws IOException {
+    YarnConfiguration conf = new YarnConfiguration();
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(conf);
+    setupQueueConfigurationWithLabels(csConf);
+
+    CapacityScheduler capacityScheduler = new CapacityScheduler();
+    RMContextImpl rmContext =
+        new RMContextImpl(null, null, null, null, null, null,
+            new RMContainerTokenSecretManager(csConf),
+            new NMTokenSecretManagerInRM(csConf),
+            new ClientToAMTokenSecretManagerInRM(), null);
+    
+    RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager();
+    nodeLabelsManager.init(conf);
+    nodeLabelsManager.start();
+    
+    rmContext.setNodeLabelManager(nodeLabelsManager);
+    capacityScheduler.setConf(csConf);
+    capacityScheduler.setRMContext(rmContext);
+    capacityScheduler.init(csConf);
+    capacityScheduler.start();
+    ServiceOperations.stopQuietly(capacityScheduler);
+    ServiceOperations.stopQuietly(nodeLabelsManager);
+  }
+  
+  @Test(expected = Exception.class)
+  public void testQueueParsingWhenLabelsInheritedNotExistedInNodeLabelManager()
+      throws IOException {
+    YarnConfiguration conf = new YarnConfiguration();
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(conf);
+    setupQueueConfigurationWithLabelsInherit(csConf);
+
+    CapacityScheduler capacityScheduler = new CapacityScheduler();
+    RMContextImpl rmContext =
+        new RMContextImpl(null, null, null, null, null, null,
+            new RMContainerTokenSecretManager(csConf),
+            new NMTokenSecretManagerInRM(csConf),
+            new ClientToAMTokenSecretManagerInRM(), null);
+    
+    RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager();
+    nodeLabelsManager.init(conf);
+    nodeLabelsManager.start();
+    
+    rmContext.setNodeLabelManager(nodeLabelsManager);
+    capacityScheduler.setConf(csConf);
+    capacityScheduler.setRMContext(rmContext);
+    capacityScheduler.init(csConf);
+    capacityScheduler.start();
+    ServiceOperations.stopQuietly(capacityScheduler);
+    ServiceOperations.stopQuietly(nodeLabelsManager);
+  }
+  
+  @Test(expected = Exception.class)
+  public void testSingleLevelQueueParsingWhenLabelsNotExistedInNodeLabelManager()
+      throws IOException {
+    YarnConfiguration conf = new YarnConfiguration();
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(conf);
+    setupQueueConfigurationWithSingleLevel(csConf);
+
+    CapacityScheduler capacityScheduler = new CapacityScheduler();
+    RMContextImpl rmContext =
+        new RMContextImpl(null, null, null, null, null, null,
+            new RMContainerTokenSecretManager(csConf),
+            new NMTokenSecretManagerInRM(csConf),
+            new ClientToAMTokenSecretManagerInRM(), null);
+    
+    RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager();
+    nodeLabelsManager.init(conf);
+    nodeLabelsManager.start();
+    
+    rmContext.setNodeLabelManager(nodeLabelsManager);
+    capacityScheduler.setConf(csConf);
+    capacityScheduler.setRMContext(rmContext);
+    capacityScheduler.init(csConf);
+    capacityScheduler.start();
+    ServiceOperations.stopQuietly(capacityScheduler);
+    ServiceOperations.stopQuietly(nodeLabelsManager);
+  }
+  
+  @Test(expected = Exception.class)
+  public void testQueueParsingWhenLabelsNotExist() throws IOException {
+    YarnConfiguration conf = new YarnConfiguration();
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(conf);
+    setupQueueConfigurationWithLabels(csConf);
+
+    CapacityScheduler capacityScheduler = new CapacityScheduler();
+    RMContextImpl rmContext =
+        new RMContextImpl(null, null, null, null, null, null,
+            new RMContainerTokenSecretManager(csConf),
+            new NMTokenSecretManagerInRM(csConf),
+            new ClientToAMTokenSecretManagerInRM(), null);
+    
+    RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager();
+    nodeLabelsManager.init(conf);
+    nodeLabelsManager.start();
+    
+    rmContext.setNodeLabelManager(nodeLabelsManager);
+    capacityScheduler.setConf(csConf);
+    capacityScheduler.setRMContext(rmContext);
+    capacityScheduler.init(csConf);
+    capacityScheduler.start();
+    ServiceOperations.stopQuietly(capacityScheduler);
+    ServiceOperations.stopQuietly(nodeLabelsManager);
   }
 }