You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/04/12 23:42:14 UTC

[27/50] hadoop git commit: YARN-6344. Add parameter for rack locality delay in CapacityScheduler. (kkaranasos)

YARN-6344. Add parameter for rack locality delay in CapacityScheduler. (kkaranasos)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7999318a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7999318a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7999318a

Branch: refs/heads/HDFS-7240
Commit: 7999318af12a75b35815461c601d4c25750e8340
Parents: e9ac61c
Author: Konstantinos Karanasos <kk...@apache.org>
Authored: Mon Apr 10 15:25:33 2017 -0700
Committer: Konstantinos Karanasos <kk...@apache.org>
Committed: Mon Apr 10 15:34:44 2017 -0700

----------------------------------------------------------------------
 .../conf/capacity-scheduler.xml                 |  24 ++-
 .../scheduler/SchedulerApplicationAttempt.java  |   5 +
 .../CapacitySchedulerConfiguration.java         |  12 ++
 .../scheduler/capacity/LeafQueue.java           |  16 +-
 .../allocator/RegularContainerAllocator.java    |  41 +++--
 .../scheduler/capacity/TestLeafQueue.java       | 159 ++++++++++++++++++-
 6 files changed, 235 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7999318a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
index 47db01f..785ed04 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
@@ -111,9 +111,27 @@
     <value>40</value>
     <description>
       Number of missed scheduling opportunities after which the CapacityScheduler 
-      attempts to schedule rack-local containers. 
-      Typically this should be set to number of nodes in the cluster, By default is setting 
-      approximately number of nodes in one rack which is 40.
+      attempts to schedule rack-local containers.
+      When setting this parameter, the size of the cluster should be taken into account.
+      We use 40 as the default value, which is approximately the number of nodes in one rack.
+    </description>
+  </property>
+
+  <property>
+    <name>yarn.scheduler.capacity.rack-locality-additional-delay</name>
+    <value>-1</value>
+    <description>
+      Number of additional missed scheduling opportunities over the node-locality-delay
+      ones, after which the CapacityScheduler attempts to schedule off-switch containers,
+      instead of rack-local ones.
+      Example: with node-locality-delay=40 and rack-locality-delay=20, the scheduler will
+      attempt rack-local assignments after 40 missed opportunities, and off-switch assignments
+      after 40+20=60 missed opportunities.
+      When setting this parameter, the size of the cluster should be taken into account.
+      We use -1 as the default value, which disables this feature. In this case, the number
+      of missed opportunities for assigning off-switch containers is calculated based on
+      the number of containers and unique locations specified in the resource request,
+      as well as the size of the cluster.
     </description>
   </property>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7999318a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 91e29d5..294897f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -1304,6 +1304,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey);
   }
 
+  public Map<String, ResourceRequest> getResourceRequests(
+      SchedulerRequestKey schedulerRequestKey) {
+    return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey)
+        .getResourceRequests();
+  }
 
   public void incUnconfirmedRes(Resource res) {
     unconfirmedAllocatedMem.addAndGet(res.getMemorySize());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7999318a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 43ec390..9fb92ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -198,6 +198,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   public static final int DEFAULT_NODE_LOCALITY_DELAY = 40;
 
   @Private
+  public static final String RACK_LOCALITY_ADDITIONAL_DELAY =
+          PREFIX + "rack-locality-additional-delay";
+
+  @Private
+  public static final int DEFAULT_RACK_LOCALITY_ADDITIONAL_DELAY = -1;
+
+  @Private
   public static final String RACK_LOCALITY_FULL_RESET =
       PREFIX + "rack-locality-full-reset";
 
@@ -829,6 +836,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
     return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
   }
 
+  public int getRackLocalityAdditionalDelay() {
+    return getInt(RACK_LOCALITY_ADDITIONAL_DELAY,
+        DEFAULT_RACK_LOCALITY_ADDITIONAL_DELAY);
+  }
+
   public boolean getRackLocalityFullReset() {
     return getBoolean(RACK_LOCALITY_FULL_RESET,
         DEFAULT_RACK_LOCALITY_FULL_RESET);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7999318a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 1b20556..fa515da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -95,6 +95,7 @@ public class LeafQueue extends AbstractCSQueue {
   private float maxAMResourcePerQueuePercent;
 
   private volatile int nodeLocalityDelay;
+  private volatile int rackLocalityAdditionalDelay;
   private volatile boolean rackLocalityFullReset;
 
   Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
@@ -215,6 +216,7 @@ public class LeafQueue extends AbstractCSQueue {
       }
 
       nodeLocalityDelay = conf.getNodeLocalityDelay();
+      rackLocalityAdditionalDelay = conf.getRackLocalityAdditionalDelay();
       rackLocalityFullReset = conf.getRackLocalityFullReset();
 
       // re-init this since max allocation could have changed
@@ -271,9 +273,12 @@ public class LeafQueue extends AbstractCSQueue {
               + "numContainers = " + numContainers
               + " [= currentNumContainers ]" + "\n" + "state = " + getState()
               + " [= configuredState ]" + "\n" + "acls = " + aclsString
-              + " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = "
-              + nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder
-              .toString() + "\n" + "reservationsContinueLooking = "
+              + " [= configuredAcls ]" + "\n"
+              + "nodeLocalityDelay = " + nodeLocalityDelay + "\n"
+              + "rackLocalityAdditionalDelay = "
+              + rackLocalityAdditionalDelay + "\n"
+              + "labels=" + labelStrBuilder.toString() + "\n"
+              + "reservationsContinueLooking = "
               + reservationsContinueLooking + "\n" + "preemptionDisabled = "
               + getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = "
               + defaultAppPriorityPerQueue + "\npriority = " + priority);
@@ -1347,6 +1352,11 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   @Lock(NoLock.class)
+  public int getRackLocalityAdditionalDelay() {
+    return rackLocalityAdditionalDelay;
+  }
+
+  @Lock(NoLock.class)
   public boolean getRackLocalityFullReset() {
     return rackLocalityFullReset;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7999318a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 8078bcd..f753d31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -278,6 +278,12 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
         .getCSLeafQueue().getNodeLocalityDelay());
   }
 
+  private int getActualRackLocalityDelay() {
+    return Math.min(rmContext.getScheduler().getNumClusterNodes(),
+        application.getCSLeafQueue().getNodeLocalityDelay()
+        + application.getCSLeafQueue().getRackLocalityAdditionalDelay());
+  }
+
   private boolean canAssign(SchedulerRequestKey schedulerKey,
       FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) {
 
@@ -286,26 +292,37 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       if (reservedContainer != null) {
         return true;
       }
+      // If there are no nodes in the cluster, return false.
+      if (rmContext.getScheduler().getNumClusterNodes() == 0) {
+        return false;
+      }
+      // If we have only ANY requests for this schedulerKey, we should not
+      // delay its scheduling.
+      if (application.getResourceRequests(schedulerKey).size() == 1) {
+        return true;
+      }
 
       // 'Delay' off-switch
       long missedOpportunities =
           application.getSchedulingOpportunities(schedulerKey);
-      long requiredContainers = application.getOutstandingAsksCount(
-          schedulerKey);
 
-      float localityWaitFactor =
-          getLocalityWaitFactor(schedulerKey, rmContext.getScheduler()
-              .getNumClusterNodes());
-      // Cap the delay by the number of nodes in the cluster. Under most
-      // conditions this means we will consider each node in the cluster before
-      // accepting an off-switch assignment.
-      return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
-          (requiredContainers * localityWaitFactor)) < missedOpportunities);
+      // If rack locality additional delay parameter is enabled.
+      if (application.getCSLeafQueue().getRackLocalityAdditionalDelay() > -1) {
+        return missedOpportunities > getActualRackLocalityDelay();
+      } else {
+        long requiredContainers =
+            application.getOutstandingAsksCount(schedulerKey);
+        float localityWaitFactor = getLocalityWaitFactor(schedulerKey,
+            rmContext.getScheduler().getNumClusterNodes());
+        // Cap the delay by the number of nodes in the cluster.
+        return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
+            (requiredContainers * localityWaitFactor)) < missedOpportunities);
+      }
     }
 
     // Check if we need containers on this rack
-    if (application.getOutstandingAsksCount(schedulerKey, node.getRackName())
-        <= 0) {
+    if (application.getOutstandingAsksCount(schedulerKey,
+        node.getRackName()) <= 0) {
       return false;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7999318a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 252666d..a9ed5a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -104,7 +104,7 @@ import org.mockito.Mockito;
 
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSchedulerKey;
 
-public class TestLeafQueue {  
+public class TestLeafQueue {
   private final RecordFactory recordFactory = 
       RecordFactoryProvider.getRecordFactory(null);
   private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
@@ -2106,6 +2106,154 @@ public class TestLeafQueue {
   }
 
   @Test
+  public void testRackLocalityDelayScheduling() throws Exception {
+
+    // Change parameter values for node locality and rack locality delay.
+    csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 2);
+    csConf.setInt(
+        CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY, 1);
+    Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
+    CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext,
+        csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues,
+        TestUtils.spyHook);
+    queues = newQueues;
+    root.reinitialize(newRoot, cs.getClusterResource());
+
+    // Manipulate queue 'b'
+    LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
+
+    // Check locality parameters.
+    assertEquals(2, a.getNodeLocalityDelay());
+    assertEquals(1, a.getRackLocalityAdditionalDelay());
+
+    // User
+    String user1 = "user_1";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId1 =
+        TestUtils.getMockApplicationAttemptId(0, 0);
+    FiCaSchedulerApp app1 = new FiCaSchedulerApp(appAttemptId1, user1, a,
+        mock(ActiveUsersManager.class), spyRMContext);
+    a.submitApplicationAttempt(app1, user1);
+
+    // Setup some nodes and racks
+    String host1 = "127.0.0.1";
+    String host2 = "127.0.0.2";
+    String host3 = "127.0.0.3";
+    String host4 = "127.0.0.4";
+    String rack1 = "rack_1";
+    String rack2 = "rack_2";
+    String rack3 = "rack_3";
+    FiCaSchedulerNode node2 = TestUtils.getMockNode(host3, rack2, 0, 8 * GB);
+    FiCaSchedulerNode node3 = TestUtils.getMockNode(host4, rack3, 0, 8 * GB);
+
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps =
+        ImmutableMap.of(app1.getApplicationAttemptId(), app1);
+    Map<NodeId, FiCaSchedulerNode> nodes =
+        ImmutableMap.of(node2.getNodeID(), node2, node3.getNodeID(), node3);
+
+    final int numNodes = 5;
+    Resource clusterResource =
+        Resources.createResource(numNodes * (8 * GB), numNodes * 16);
+    when(spyRMContext.getScheduler().getNumClusterNodes()).thenReturn(numNodes);
+
+    // Setup resource-requests and submit
+    Priority priority = TestUtils.createMockPriority(1);
+    List<ResourceRequest> app1Requests1 = new ArrayList<ResourceRequest>();
+    app1Requests1.add(TestUtils.createResourceRequest(host1, 1 * GB, 1,
+        true, priority, recordFactory));
+    app1Requests1.add(TestUtils.createResourceRequest(rack1, 1 * GB, 1,
+        true, priority, recordFactory));
+    app1Requests1.add(TestUtils.createResourceRequest(host2, 1 * GB, 1,
+        true, priority, recordFactory));
+    app1Requests1.add(TestUtils.createResourceRequest(rack2, 1 * GB, 1,
+        true, priority, recordFactory));
+    // Adding one extra in the ANY.
+    app1Requests1.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
+        1 * GB, 3, true, priority, recordFactory));
+    app1.updateResourceRequests(app1Requests1);
+
+    // Start testing...
+    CSAssignment assignment = null;
+
+    SchedulerRequestKey schedulerKey = toSchedulerKey(priority);
+    assertEquals(3, app1.getOutstandingAsksCount(schedulerKey));
+
+    // No rack-local yet.
+    assignment = a.assignContainers(clusterResource, node2,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+    verifyNoContainerAllocated(assignment);
+    assertEquals(1, app1.getSchedulingOpportunities(schedulerKey));
+    assertEquals(3, app1.getOutstandingAsksCount(schedulerKey));
+    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
+
+    // Still no rack-local.
+    assignment = a.assignContainers(clusterResource, node2,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+    assertEquals(2, app1.getSchedulingOpportunities(schedulerKey));
+    assertEquals(3, app1.getOutstandingAsksCount(schedulerKey));
+    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
+
+    // Rack local now.
+    assignment = a.assignContainers(clusterResource, node2,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+    assertEquals(0, app1.getSchedulingOpportunities(schedulerKey));
+    assertEquals(2, app1.getOutstandingAsksCount(schedulerKey));
+    assertEquals(NodeType.RACK_LOCAL, assignment.getType());
+
+    // No off-switch until 3 missed opportunities.
+    a.assignContainers(clusterResource, node3,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+    a.assignContainers(clusterResource, node3,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+    assignment = a.assignContainers(clusterResource, node3,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+    assertEquals(3, app1.getSchedulingOpportunities(schedulerKey));
+    assertEquals(2, app1.getOutstandingAsksCount(schedulerKey));
+    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
+
+    // Now off-switch should succeed.
+    assignment = a.assignContainers(clusterResource, node3,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+    assertEquals(4, app1.getSchedulingOpportunities(schedulerKey));
+    assertEquals(1, app1.getOutstandingAsksCount(schedulerKey));
+    assertEquals(NodeType.OFF_SWITCH, assignment.getType());
+
+    // Check capping by number of cluster nodes.
+    doReturn(10).when(a).getRackLocalityAdditionalDelay();
+    // Off-switch will happen at 6 missed opportunities now, since cluster size
+    // is 5.
+    assignment = a.assignContainers(clusterResource, node3,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+    assertEquals(5, app1.getSchedulingOpportunities(schedulerKey));
+    assertEquals(1, app1.getOutstandingAsksCount(schedulerKey));
+    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
+    assignment = a.assignContainers(clusterResource, node3,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
+    assertEquals(6, app1.getSchedulingOpportunities(schedulerKey));
+    assertEquals(0, app1.getOutstandingAsksCount(schedulerKey));
+    assertEquals(NodeType.OFF_SWITCH, assignment.getType());
+  }
+
+  @Test
   public void testApplicationPriorityScheduling() throws Exception {
     // Manipulate queue 'a'
     LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
@@ -2410,16 +2558,18 @@ public class TestLeafQueue {
   }
   
   @Test (timeout = 30000)
-  public void testNodeLocalityAfterQueueRefresh() throws Exception {
+  public void testLocalityDelaysAfterQueueRefresh() throws Exception {
 
     // Manipulate queue 'e'
     LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
 
     // before reinitialization
     assertEquals(40, e.getNodeLocalityDelay());
+    assertEquals(-1, e.getRackLocalityAdditionalDelay());
 
-    csConf.setInt(CapacitySchedulerConfiguration
-        .NODE_LOCALITY_DELAY, 60);
+    csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 60);
+    csConf.setInt(
+        CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY, 600);
     Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
     CSQueue newRoot =
         CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
@@ -2431,6 +2581,7 @@ public class TestLeafQueue {
 
     // after reinitialization
     assertEquals(60, e.getNodeLocalityDelay());
+    assertEquals(600, e.getRackLocalityAdditionalDelay());
   }
 
   @Test (timeout = 30000)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org