You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2019/02/11 07:56:04 UTC

[hadoop] branch trunk updated: YARN-8555. Parameterize TestSchedulingRequestContainerAllocation(Async) to cover both PC handler options. Contributed by Prabhu Joseph.

This is an automated email from the ASF dual-hosted git repository.

wwei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0a1637c  YARN-8555. Parameterize TestSchedulingRequestContainerAllocation(Async) to cover both PC handler options. Contributed by Prabhu Joseph.
0a1637c is described below

commit 0a1637c750a5dbe67f1a9b2969ac1b2d4785a11c
Author: Weiwei Yang <ww...@apache.org>
AuthorDate: Mon Feb 11 15:53:50 2019 +0800

    YARN-8555. Parameterize TestSchedulingRequestContainerAllocation(Async) to cover both PC handler options. Contributed by Prabhu Joseph.
---
 .../TestSchedulingRequestContainerAllocation.java  | 228 ++++++++-------------
 ...tSchedulingRequestContainerAllocationAsync.java |  53 +++--
 2 files changed, 116 insertions(+), 165 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
index 26c709f..b1bb515 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
@@ -47,10 +47,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -68,28 +70,43 @@ import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.cardinali
 import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
 import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
 
+/**
+ * Test Container Allocation with SchedulingRequest.
+ */
+@RunWith(Parameterized.class)
 public class TestSchedulingRequestContainerAllocation {
   private static final int GB = 1024;
-
   private YarnConfiguration conf;
-
+  private String placementConstraintHandler;
   RMNodeLabelsManager mgr;
 
+
+  @Parameters
+  public static Object[] placementConstarintHandlers() {
+    return new Object[] {
+        YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER};
+  }
+
+  public TestSchedulingRequestContainerAllocation(
+      String placementConstraintHandler) {
+    this.placementConstraintHandler = placementConstraintHandler;
+  }
+
   @Before
   public void setUp() throws Exception {
     conf = new YarnConfiguration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
         ResourceScheduler.class);
+    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        this.placementConstraintHandler);
     mgr = new NullRMNodeLabelsManager();
     mgr.init(conf);
   }
 
-  @Test
+  @Test(timeout = 30000L)
   public void testIntraAppAntiAffinity() throws Exception {
-    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
-        new Configuration());
-    csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
-        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
 
     // inject node label manager
     MockRM rm1 = new MockRM(csConf) {
@@ -120,18 +137,9 @@ public class TestSchedulingRequestContainerAllocation {
         ResourceSizing.newInstance(10, Resource.newInstance(1024, 1)),
         Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
 
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 4; j++) {
-        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
-      }
-    }
-
-    // App1 should get 5 containers allocated (1 AM + 1 node each).
-    FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(5, schedulerApp.getLiveContainers().size());
+    List<Container> allocated = waitForAllocation(4, 3000, am1, nms);
+    Assert.assertEquals(4, allocated.size());
+    Assert.assertEquals(4, getContainerNodesNum(allocated));
 
     // Similarly, app1 asks 10 anti-affinity containers at different priority,
     // it should be satisfied as well.
@@ -141,38 +149,30 @@ public class TestSchedulingRequestContainerAllocation {
         ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
         Priority.newInstance(2), 1L, ImmutableSet.of("reducer"), "reducer");
 
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 4; j++) {
-        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
-      }
-    }
-
-    // App1 should get 9 containers allocated (1 AM + 8 containers).
-    Assert.assertEquals(9, schedulerApp.getLiveContainers().size());
+    allocated = waitForAllocation(4, 3000, am1, nms);
+    Assert.assertEquals(4, allocated.size());
+    Assert.assertEquals(4, getContainerNodesNum(allocated));
 
     // Test anti-affinity to both of "mapper/reducer", we should only get no
     // container allocated
     am1.allocateIntraAppAntiAffinity(
         ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
         Priority.newInstance(3), 1L, ImmutableSet.of("reducer2"), "mapper");
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 4; j++) {
-        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
-      }
-    }
 
-    // App1 should get 10 containers allocated (1 AM + 9 containers).
-    Assert.assertEquals(9, schedulerApp.getLiveContainers().size());
+    boolean caughtException = false;
+    try {
+      allocated = waitForAllocation(1, 3000, am1, nms);
+    } catch (Exception e) {
+      caughtException = true;
+    }
+    Assert.assertTrue(caughtException);
 
     rm1.close();
   }
 
-  @Test
+  @Test(timeout = 30000L)
   public void testIntraAppAntiAffinityWithMultipleTags() throws Exception {
-    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
-        new Configuration());
-    csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
-        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
 
     // inject node label manager
     MockRM rm1 = new MockRM(csConf) {
@@ -203,18 +203,9 @@ public class TestSchedulingRequestContainerAllocation {
         Priority.newInstance(1), 1L, ImmutableSet.of("tag_1_1", "tag_1_2"),
         "tag_1_1", "tag_1_2");
 
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 4; j++) {
-        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
-      }
-    }
-
-    // App1 should get 3 containers allocated (1 AM + 2 task).
-    FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(3, schedulerApp.getLiveContainers().size());
+    List<Container> allocated = waitForAllocation(2, 3000, am1, nms);
+    Assert.assertEquals(2, allocated.size());
+    Assert.assertEquals(2, getContainerNodesNum(allocated));
 
     // app1 asks for 1 anti-affinity containers for the same app. anti-affinity
     // to tag_1_1/tag_1_2. With allocation_tag = tag_2_1/tag_2_2
@@ -223,33 +214,22 @@ public class TestSchedulingRequestContainerAllocation {
         Priority.newInstance(2), 1L, ImmutableSet.of("tag_2_1", "tag_2_2"),
         "tag_1_1", "tag_1_2");
 
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 4; j++) {
-        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
-      }
-    }
-
-    // App1 should get 4 containers allocated (1 AM + 2 task (first request) +
-    // 1 task (2nd request).
-    Assert.assertEquals(4, schedulerApp.getLiveContainers().size());
+    List<Container> allocated1 = waitForAllocation(1, 3000, am1, nms);
+    Assert.assertEquals(1, allocated1.size());
+    allocated.addAll(allocated1);
+    Assert.assertEquals(3, getContainerNodesNum(allocated));
 
-    // app1 asks for 10 anti-affinity containers for the same app. anti-affinity
+    // app1 asks for 1 anti-affinity containers for the same app. anti-affinity
     // to tag_1_1/tag_1_2/tag_2_1/tag_2_2. With allocation_tag = tag_3
     am1.allocateIntraAppAntiAffinity(
         ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
         Priority.newInstance(3), 1L, ImmutableSet.of("tag_3"),
         "tag_1_1", "tag_1_2", "tag_2_1", "tag_2_2");
 
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 4; j++) {
-        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
-      }
-    }
-
-    // App1 should get 1 more containers allocated
-    // 1 AM + 2 task (first request) + 1 task (2nd request) +
-    // 1 task (3rd request)
-    Assert.assertEquals(5, schedulerApp.getLiveContainers().size());
+    allocated1 = waitForAllocation(1, 3000, am1, nms);
+    Assert.assertEquals(1, allocated1.size());
+    allocated.addAll(allocated1);
+    Assert.assertEquals(4, getContainerNodesNum(allocated));
 
     rm1.close();
   }
@@ -260,12 +240,9 @@ public class TestSchedulingRequestContainerAllocation {
    * types, see more in TestPlacementConstraintsUtil.
    * @throws Exception
    */
-  @Test
+  @Test(timeout = 30000L)
   public void testInterAppAntiAffinity() throws Exception {
-    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
-        new Configuration());
-    csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
-        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
 
     // inject node label manager
     MockRM rm1 = new MockRM(csConf) {
@@ -296,13 +273,9 @@ public class TestSchedulingRequestContainerAllocation {
         ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
         Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
 
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 4; j++) {
-        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
-      }
-    }
+    List<Container> allocated = waitForAllocation(3, 3000, am1, nms);
+    Assert.assertEquals(3, allocated.size());
+    Assert.assertEquals(3, getContainerNodesNum(allocated));
 
     System.out.println("Mappers on HOST0: "
         + rmNodes[0].getAllocationTagsWithCount().get("mapper"));
@@ -311,11 +284,6 @@ public class TestSchedulingRequestContainerAllocation {
     System.out.println("Mappers on HOST2: "
         + rmNodes[2].getAllocationTagsWithCount().get("mapper"));
 
-    // App1 should get 4 containers allocated (1 AM + 3 mappers).
-    FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(4, schedulerApp.getLiveContainers().size());
-
     // app2 -> c
     RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
     MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms[0]);
@@ -330,18 +298,17 @@ public class TestSchedulingRequestContainerAllocation {
         Priority.newInstance(1), 1L, allNs.toString(),
         ImmutableSet.of("foo"), "mapper");
 
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 4; j++) {
-        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
-      }
-    }
+    List<Container> allocated1 = waitForAllocation(3, 3000, am2, nms);
+    Assert.assertEquals(3, allocated1.size());
+    Assert.assertEquals(1, getContainerNodesNum(allocated1));
+    allocated.addAll(allocated1);
+    Assert.assertEquals(4, getContainerNodesNum(allocated));
+
 
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
     FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
         am2.getApplicationAttemptId());
 
-    // App2 should get 4 containers allocated (1 AM + 3 container).
-    Assert.assertEquals(4, schedulerApp2.getLiveContainers().size());
-
     // The allocated node should not have mapper tag.
     Assert.assertTrue(schedulerApp2.getLiveContainers()
         .stream().allMatch(rmContainer -> {
@@ -365,17 +332,11 @@ public class TestSchedulingRequestContainerAllocation {
         Priority.newInstance(1), 1L, allNs.toString(),
         ImmutableSet.of("mapper"), "mapper");
 
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 4; j++) {
-        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
-      }
-    }
-
-    FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
-        am3.getApplicationAttemptId());
 
-    // App3 should get 2 containers allocated (1 AM + 1 container).
-    Assert.assertEquals(2, schedulerApp3.getLiveContainers().size());
+    allocated1 = waitForAllocation(1, 3000, am3, nms);
+    Assert.assertEquals(1, allocated1.size());
+    allocated.addAll(allocated1);
+    Assert.assertEquals(4, getContainerNodesNum(allocated));
 
     rm1.close();
   }
@@ -423,12 +384,9 @@ public class TestSchedulingRequestContainerAllocation {
     rm1.close();
   }
 
-  @Test
+  @Test(timeout = 30000L)
   public void testSchedulingRequestWithNullConstraint() throws Exception {
-    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
-        new Configuration());
-    csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
-        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
 
     // inject node label manager
     MockRM rm1 = new MockRM(csConf) {
@@ -467,14 +425,8 @@ public class TestSchedulingRequestContainerAllocation {
         .schedulingRequests(ImmutableList.of(sc)).build();
     am1.allocate(request);
 
-    for (int i = 0; i < 4; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
-    }
-
-    FiCaSchedulerApp schedApp = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(2, schedApp.getLiveContainers().size());
-
+    List<Container> allocated = waitForAllocation(1, 3000, am1, nms);
+    Assert.assertEquals(1, allocated.size());
 
     // Send another request with null placement constraint,
     // ensure there is no NPE while handling this request.
@@ -488,22 +440,19 @@ public class TestSchedulingRequestContainerAllocation {
         .schedulingRequests(ImmutableList.of(sc)).build();
     am1.allocate(request1);
 
-    for (int i = 0; i < 4; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
-    }
-
-    Assert.assertEquals(4, schedApp.getLiveContainers().size());
+    allocated = waitForAllocation(2, 3000, am1, nms);
+    Assert.assertEquals(2, allocated.size());
 
     rm1.close();
   }
 
-  private void doNodeHeartbeat(MockNM... nms) throws Exception {
+  private static void doNodeHeartbeat(MockNM... nms) throws Exception {
     for (MockNM nm : nms) {
       nm.nodeHeartbeat(true);
     }
   }
 
-  private List<Container> waitForAllocation(int allocNum, int timeout,
+  public static List<Container> waitForAllocation(int allocNum, int timeout,
       MockAM am, MockNM... nms) throws Exception {
     final List<Container> result = new ArrayList<>();
     GenericTestUtils.waitFor(() -> {
@@ -553,7 +502,7 @@ public class TestSchedulingRequestContainerAllocation {
         .build();
   }
 
-  private int getContainerNodesNum(List<Container> containers) {
+  public static int getContainerNodesNum(List<Container> containers) {
     Set<NodeId> nodes = new HashSet<>();
     if (containers != null) {
       containers.forEach(c -> nodes.add(c.getNodeId()));
@@ -566,12 +515,8 @@ public class TestSchedulingRequestContainerAllocation {
     // This test both intra and inter app constraints.
     // Including simple affinity, anti-affinity, cardinality constraints,
     // and simple AND composite constraints.
-    YarnConfiguration config = new YarnConfiguration();
-    config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
-        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
-    config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-        ResourceScheduler.class);
-    MockRM rm = new MockRM(config);
+
+    MockRM rm = new MockRM(conf);
     try {
       rm.start();
 
@@ -698,12 +643,8 @@ public class TestSchedulingRequestContainerAllocation {
   @Test(timeout = 30000L)
   public void testMultiAllocationTagsConstraints() throws Exception {
     // This test simulates to use PC to avoid port conflicts
-    YarnConfiguration config = new YarnConfiguration();
-    config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
-        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
-    config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-        ResourceScheduler.class);
-    MockRM rm = new MockRM(config);
+
+    MockRM rm = new MockRM(conf);
     try {
       rm.start();
 
@@ -779,12 +720,7 @@ public class TestSchedulingRequestContainerAllocation {
   public void testInterAppConstraintsWithNamespaces() throws Exception {
     // This test verifies inter-app constraints with namespaces
     // not-self/app-id/app-tag
-    YarnConfiguration config = new YarnConfiguration();
-    config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
-        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
-    config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-        ResourceScheduler.class);
-    MockRM rm = new MockRM(config);
+    MockRM rm = new MockRM(conf);
     try {
       rm.start();
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
index d1d05dc..95e0a72 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceSizing;
@@ -32,38 +33,59 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
+import java.util.List;
+
+/**
+ * Test SchedulingRequest With Asynchronous Scheduling.
+ */
+@RunWith(Parameterized.class)
 public class TestSchedulingRequestContainerAllocationAsync {
   private final int GB = 1024;
 
   private YarnConfiguration conf;
+  private String placementConstraintHandler;
 
   RMNodeLabelsManager mgr;
 
+  @Parameters
+  public static Object[] placementConstarintHandlers() {
+    return new Object[] {
+        YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER};
+  }
+
+  public TestSchedulingRequestContainerAllocationAsync(
+      String placementConstraintHandler) {
+    this.placementConstraintHandler = placementConstraintHandler;
+  }
+
   @Before
   public void setUp() throws Exception {
     conf = new YarnConfiguration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
         ResourceScheduler.class);
+    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        this.placementConstraintHandler);
     mgr = new NullRMNodeLabelsManager();
     mgr.init(conf);
   }
 
+
   private void testIntraAppAntiAffinityAsync(int numThreads) throws Exception {
     Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
-        new Configuration());
+        conf);
     csConf.setInt(
         CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
         numThreads);
     csConf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
         + ".scheduling-interval-ms", 0);
-    csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
-        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
 
     // inject node label manager
     MockRM rm1 = new MockRM(csConf) {
@@ -89,24 +111,17 @@ public class TestSchedulingRequestContainerAllocationAsync {
     RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
 
-    // app1 asks for 10 anti-affinity containers for the same app. It should
-    // only get 4 containers allocated because we only have 4 nodes.
+    // app1 asks for 1000 anti-affinity containers for the same app. It should
+    // only get 200 containers allocated because we only have 200 nodes.
     am1.allocateIntraAppAntiAffinity(
         ResourceSizing.newInstance(1000, Resource.newInstance(1024, 1)),
         Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
 
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < nNMs; j++) {
-        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
-      }
-    }
-
-    // App1 should get #NM + 1 containers allocated (1 node each + 1 AM).
-    FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(nNMs + 1, schedulerApp.getLiveContainers().size());
+    List<Container> allocated = TestSchedulingRequestContainerAllocation.
+            waitForAllocation(nNMs, 6000, am1, nms);
+    Assert.assertEquals(nNMs, allocated.size());
+    Assert.assertEquals(nNMs, TestSchedulingRequestContainerAllocation.
+            getContainerNodesNum(allocated));
 
     rm1.close();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org