You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2019/02/11 07:56:04 UTC
[hadoop] branch trunk updated: YARN-8555. Parameterize
TestSchedulingRequestContainerAllocation(Async) to cover both PC handler
options. Contributed by Prabhu Joseph.
This is an automated email from the ASF dual-hosted git repository.
wwei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0a1637c YARN-8555. Parameterize TestSchedulingRequestContainerAllocation(Async) to cover both PC handler options. Contributed by Prabhu Joseph.
0a1637c is described below
commit 0a1637c750a5dbe67f1a9b2969ac1b2d4785a11c
Author: Weiwei Yang <ww...@apache.org>
AuthorDate: Mon Feb 11 15:53:50 2019 +0800
YARN-8555. Parameterize TestSchedulingRequestContainerAllocation(Async) to cover both PC handler options. Contributed by Prabhu Joseph.
---
.../TestSchedulingRequestContainerAllocation.java | 228 ++++++++-------------
...tSchedulingRequestContainerAllocationAsync.java | 53 +++--
2 files changed, 116 insertions(+), 165 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
index 26c709f..b1bb515 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
@@ -47,10 +47,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import java.util.ArrayList;
import java.util.Arrays;
@@ -68,28 +70,43 @@ import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.cardinali
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
+/**
+ * Test Container Allocation with SchedulingRequest.
+ */
+@RunWith(Parameterized.class)
public class TestSchedulingRequestContainerAllocation {
private static final int GB = 1024;
-
private YarnConfiguration conf;
-
+ private String placementConstraintHandler;
RMNodeLabelsManager mgr;
+
+ @Parameters
+ public static Object[] placementConstarintHandlers() {
+ return new Object[] {
+ YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER};
+ }
+
+ public TestSchedulingRequestContainerAllocation(
+ String placementConstraintHandler) {
+ this.placementConstraintHandler = placementConstraintHandler;
+ }
+
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
+ conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ this.placementConstraintHandler);
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
}
- @Test
+ @Test(timeout = 30000L)
public void testIntraAppAntiAffinity() throws Exception {
- Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
- new Configuration());
- csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
- YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@@ -120,18 +137,9 @@ public class TestSchedulingRequestContainerAllocation {
ResourceSizing.newInstance(10, Resource.newInstance(1024, 1)),
Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-
- for (int i = 0; i < 3; i++) {
- for (int j = 0; j < 4; j++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
- }
- }
-
- // App1 should get 5 containers allocated (1 AM + 1 node each).
- FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(5, schedulerApp.getLiveContainers().size());
+ List<Container> allocated = waitForAllocation(4, 3000, am1, nms);
+ Assert.assertEquals(4, allocated.size());
+ Assert.assertEquals(4, getContainerNodesNum(allocated));
// Similarly, app1 asks 10 anti-affinity containers at different priority,
// it should be satisfied as well.
@@ -141,38 +149,30 @@ public class TestSchedulingRequestContainerAllocation {
ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
Priority.newInstance(2), 1L, ImmutableSet.of("reducer"), "reducer");
- for (int i = 0; i < 3; i++) {
- for (int j = 0; j < 4; j++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
- }
- }
-
- // App1 should get 9 containers allocated (1 AM + 8 containers).
- Assert.assertEquals(9, schedulerApp.getLiveContainers().size());
+ allocated = waitForAllocation(4, 3000, am1, nms);
+ Assert.assertEquals(4, allocated.size());
+ Assert.assertEquals(4, getContainerNodesNum(allocated));
// Test anti-affinity to both of "mapper/reducer", we should only get no
// container allocated
am1.allocateIntraAppAntiAffinity(
ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
Priority.newInstance(3), 1L, ImmutableSet.of("reducer2"), "mapper");
- for (int i = 0; i < 3; i++) {
- for (int j = 0; j < 4; j++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
- }
- }
- // App1 should get 10 containers allocated (1 AM + 9 containers).
- Assert.assertEquals(9, schedulerApp.getLiveContainers().size());
+ boolean caughtException = false;
+ try {
+ allocated = waitForAllocation(1, 3000, am1, nms);
+ } catch (Exception e) {
+ caughtException = true;
+ }
+ Assert.assertTrue(caughtException);
rm1.close();
}
- @Test
+ @Test(timeout = 30000L)
public void testIntraAppAntiAffinityWithMultipleTags() throws Exception {
- Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
- new Configuration());
- csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
- YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@@ -203,18 +203,9 @@ public class TestSchedulingRequestContainerAllocation {
Priority.newInstance(1), 1L, ImmutableSet.of("tag_1_1", "tag_1_2"),
"tag_1_1", "tag_1_2");
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-
- for (int i = 0; i < 3; i++) {
- for (int j = 0; j < 4; j++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
- }
- }
-
- // App1 should get 3 containers allocated (1 AM + 2 task).
- FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(3, schedulerApp.getLiveContainers().size());
+ List<Container> allocated = waitForAllocation(2, 3000, am1, nms);
+ Assert.assertEquals(2, allocated.size());
+ Assert.assertEquals(2, getContainerNodesNum(allocated));
// app1 asks for 1 anti-affinity containers for the same app. anti-affinity
// to tag_1_1/tag_1_2. With allocation_tag = tag_2_1/tag_2_2
@@ -223,33 +214,22 @@ public class TestSchedulingRequestContainerAllocation {
Priority.newInstance(2), 1L, ImmutableSet.of("tag_2_1", "tag_2_2"),
"tag_1_1", "tag_1_2");
- for (int i = 0; i < 3; i++) {
- for (int j = 0; j < 4; j++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
- }
- }
-
- // App1 should get 4 containers allocated (1 AM + 2 task (first request) +
- // 1 task (2nd request).
- Assert.assertEquals(4, schedulerApp.getLiveContainers().size());
+ List<Container> allocated1 = waitForAllocation(1, 3000, am1, nms);
+ Assert.assertEquals(1, allocated1.size());
+ allocated.addAll(allocated1);
+ Assert.assertEquals(3, getContainerNodesNum(allocated));
- // app1 asks for 10 anti-affinity containers for the same app. anti-affinity
+ // app1 asks for 1 anti-affinity containers for the same app. anti-affinity
// to tag_1_1/tag_1_2/tag_2_1/tag_2_2. With allocation_tag = tag_3
am1.allocateIntraAppAntiAffinity(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
Priority.newInstance(3), 1L, ImmutableSet.of("tag_3"),
"tag_1_1", "tag_1_2", "tag_2_1", "tag_2_2");
- for (int i = 0; i < 3; i++) {
- for (int j = 0; j < 4; j++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
- }
- }
-
- // App1 should get 1 more containers allocated
- // 1 AM + 2 task (first request) + 1 task (2nd request) +
- // 1 task (3rd request)
- Assert.assertEquals(5, schedulerApp.getLiveContainers().size());
+ allocated1 = waitForAllocation(1, 3000, am1, nms);
+ Assert.assertEquals(1, allocated1.size());
+ allocated.addAll(allocated1);
+ Assert.assertEquals(4, getContainerNodesNum(allocated));
rm1.close();
}
@@ -260,12 +240,9 @@ public class TestSchedulingRequestContainerAllocation {
* types, see more in TestPlacementConstraintsUtil.
* @throws Exception
*/
- @Test
+ @Test(timeout = 30000L)
public void testInterAppAntiAffinity() throws Exception {
- Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
- new Configuration());
- csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
- YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@@ -296,13 +273,9 @@ public class TestSchedulingRequestContainerAllocation {
ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-
- for (int i = 0; i < 3; i++) {
- for (int j = 0; j < 4; j++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
- }
- }
+ List<Container> allocated = waitForAllocation(3, 3000, am1, nms);
+ Assert.assertEquals(3, allocated.size());
+ Assert.assertEquals(3, getContainerNodesNum(allocated));
System.out.println("Mappers on HOST0: "
+ rmNodes[0].getAllocationTagsWithCount().get("mapper"));
@@ -311,11 +284,6 @@ public class TestSchedulingRequestContainerAllocation {
System.out.println("Mappers on HOST2: "
+ rmNodes[2].getAllocationTagsWithCount().get("mapper"));
- // App1 should get 4 containers allocated (1 AM + 3 mappers).
- FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(4, schedulerApp.getLiveContainers().size());
-
// app2 -> c
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms[0]);
@@ -330,18 +298,17 @@ public class TestSchedulingRequestContainerAllocation {
Priority.newInstance(1), 1L, allNs.toString(),
ImmutableSet.of("foo"), "mapper");
- for (int i = 0; i < 3; i++) {
- for (int j = 0; j < 4; j++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
- }
- }
+ List<Container> allocated1 = waitForAllocation(3, 3000, am2, nms);
+ Assert.assertEquals(3, allocated1.size());
+ Assert.assertEquals(1, getContainerNodesNum(allocated1));
+ allocated.addAll(allocated1);
+ Assert.assertEquals(4, getContainerNodesNum(allocated));
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
am2.getApplicationAttemptId());
- // App2 should get 4 containers allocated (1 AM + 3 container).
- Assert.assertEquals(4, schedulerApp2.getLiveContainers().size());
-
// The allocated node should not have mapper tag.
Assert.assertTrue(schedulerApp2.getLiveContainers()
.stream().allMatch(rmContainer -> {
@@ -365,17 +332,11 @@ public class TestSchedulingRequestContainerAllocation {
Priority.newInstance(1), 1L, allNs.toString(),
ImmutableSet.of("mapper"), "mapper");
- for (int i = 0; i < 3; i++) {
- for (int j = 0; j < 4; j++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
- }
- }
-
- FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
- am3.getApplicationAttemptId());
- // App3 should get 2 containers allocated (1 AM + 1 container).
- Assert.assertEquals(2, schedulerApp3.getLiveContainers().size());
+ allocated1 = waitForAllocation(1, 3000, am3, nms);
+ Assert.assertEquals(1, allocated1.size());
+ allocated.addAll(allocated1);
+ Assert.assertEquals(4, getContainerNodesNum(allocated));
rm1.close();
}
@@ -423,12 +384,9 @@ public class TestSchedulingRequestContainerAllocation {
rm1.close();
}
- @Test
+ @Test(timeout = 30000L)
public void testSchedulingRequestWithNullConstraint() throws Exception {
- Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
- new Configuration());
- csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
- YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@@ -467,14 +425,8 @@ public class TestSchedulingRequestContainerAllocation {
.schedulingRequests(ImmutableList.of(sc)).build();
am1.allocate(request);
- for (int i = 0; i < 4; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
- }
-
- FiCaSchedulerApp schedApp = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(2, schedApp.getLiveContainers().size());
-
+ List<Container> allocated = waitForAllocation(1, 3000, am1, nms);
+ Assert.assertEquals(1, allocated.size());
// Send another request with null placement constraint,
// ensure there is no NPE while handling this request.
@@ -488,22 +440,19 @@ public class TestSchedulingRequestContainerAllocation {
.schedulingRequests(ImmutableList.of(sc)).build();
am1.allocate(request1);
- for (int i = 0; i < 4; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
- }
-
- Assert.assertEquals(4, schedApp.getLiveContainers().size());
+ allocated = waitForAllocation(2, 3000, am1, nms);
+ Assert.assertEquals(2, allocated.size());
rm1.close();
}
- private void doNodeHeartbeat(MockNM... nms) throws Exception {
+ private static void doNodeHeartbeat(MockNM... nms) throws Exception {
for (MockNM nm : nms) {
nm.nodeHeartbeat(true);
}
}
- private List<Container> waitForAllocation(int allocNum, int timeout,
+ public static List<Container> waitForAllocation(int allocNum, int timeout,
MockAM am, MockNM... nms) throws Exception {
final List<Container> result = new ArrayList<>();
GenericTestUtils.waitFor(() -> {
@@ -553,7 +502,7 @@ public class TestSchedulingRequestContainerAllocation {
.build();
}
- private int getContainerNodesNum(List<Container> containers) {
+ public static int getContainerNodesNum(List<Container> containers) {
Set<NodeId> nodes = new HashSet<>();
if (containers != null) {
containers.forEach(c -> nodes.add(c.getNodeId()));
@@ -566,12 +515,8 @@ public class TestSchedulingRequestContainerAllocation {
// This test both intra and inter app constraints.
// Including simple affinity, anti-affinity, cardinality constraints,
// and simple AND composite constraints.
- YarnConfiguration config = new YarnConfiguration();
- config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
- YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
- config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
- MockRM rm = new MockRM(config);
+
+ MockRM rm = new MockRM(conf);
try {
rm.start();
@@ -698,12 +643,8 @@ public class TestSchedulingRequestContainerAllocation {
@Test(timeout = 30000L)
public void testMultiAllocationTagsConstraints() throws Exception {
// This test simulates to use PC to avoid port conflicts
- YarnConfiguration config = new YarnConfiguration();
- config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
- YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
- config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
- MockRM rm = new MockRM(config);
+
+ MockRM rm = new MockRM(conf);
try {
rm.start();
@@ -779,12 +720,7 @@ public class TestSchedulingRequestContainerAllocation {
public void testInterAppConstraintsWithNamespaces() throws Exception {
// This test verifies inter-app constraints with namespaces
// not-self/app-id/app-tag
- YarnConfiguration config = new YarnConfiguration();
- config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
- YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
- config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
- MockRM rm = new MockRM(config);
+ MockRM rm = new MockRM(conf);
try {
rm.start();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
index d1d05dc..95e0a72 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
@@ -32,38 +33,59 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import java.util.List;
+
+/**
+ * Test SchedulingRequest With Asynchronous Scheduling.
+ */
+@RunWith(Parameterized.class)
public class TestSchedulingRequestContainerAllocationAsync {
private final int GB = 1024;
private YarnConfiguration conf;
+ private String placementConstraintHandler;
RMNodeLabelsManager mgr;
+ @Parameters
+ public static Object[] placementConstarintHandlers() {
+ return new Object[] {
+ YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER};
+ }
+
+ public TestSchedulingRequestContainerAllocationAsync(
+ String placementConstraintHandler) {
+ this.placementConstraintHandler = placementConstraintHandler;
+ }
+
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
+ conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ this.placementConstraintHandler);
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
}
+
private void testIntraAppAntiAffinityAsync(int numThreads) throws Exception {
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
- new Configuration());
+ conf);
csConf.setInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
numThreads);
csConf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ ".scheduling-interval-ms", 0);
- csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
- YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@@ -89,24 +111,17 @@ public class TestSchedulingRequestContainerAllocationAsync {
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
- // app1 asks for 10 anti-affinity containers for the same app. It should
- // only get 4 containers allocated because we only have 4 nodes.
+ // app1 asks for 1000 anti-affinity containers for the same app. It should
+ // only get 200 containers allocated because we only have 200 nodes.
am1.allocateIntraAppAntiAffinity(
ResourceSizing.newInstance(1000, Resource.newInstance(1024, 1)),
Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-
- for (int i = 0; i < 3; i++) {
- for (int j = 0; j < nNMs; j++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
- }
- }
-
- // App1 should get #NM + 1 containers allocated (1 node each + 1 AM).
- FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(nNMs + 1, schedulerApp.getLiveContainers().size());
+ List<Container> allocated = TestSchedulingRequestContainerAllocation.
+ waitForAllocation(nNMs, 6000, am1, nms);
+ Assert.assertEquals(nNMs, allocated.size());
+ Assert.assertEquals(nNMs, TestSchedulingRequestContainerAllocation.
+ getContainerNodesNum(allocated));
rm1.close();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org