You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/24 22:48:35 UTC
[34/42] hadoop git commit: YARN-6599. Support anti-affinity
constraint via AppPlacementAllocator. (Wangda Tan via asuresh)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/390a6a45/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
index a3b88c0..01d5e6c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
@@ -170,7 +170,7 @@ public class TestCapacitySchedulerAutoQueueCreation
1 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId, Collections.<ResourceRequest>singletonList(r1),
- Collections.<ContainerId>emptyList(), Collections.singletonList(host),
+ null, Collections.<ContainerId>emptyList(), Collections.singletonList(host),
null, NULL_UPDATE_REQUESTS);
//And this will result in container assignment for app1
http://git-wip-us.apache.org/repos/asf/hadoop/blob/390a6a45/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java
new file mode 100644
index 0000000..b6ac4b6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class TestCapacitySchedulerSchedulingRequestUpdate
+ extends CapacitySchedulerTestBase {
+ @Test
+ public void testBasicPendingResourceUpdate() throws Exception {
+ Configuration conf = TestUtils.getConfigurationWithQueueLabels(
+ new Configuration(false));
+ conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+
+ final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+ mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
+ mgr.addLabelsToNode(
+ ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ MockRM rm = new MockRM(conf) {
+ protected RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm.start();
+ MockNM nm1 = // label = x
+ new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
+ nm1.registerNode();
+
+ MockNM nm2 = // label = ""
+ new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService());
+ nm2.registerNode();
+
+ // Launch app1 in queue=a1
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+
+ // Launch app2 in queue=b1
+ RMApp app2 = rm.submitApp(8 * GB, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+ // am1 asks for 8 * 1GB container for no label
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(8, Resource.newInstance(1 * GB, 1)),
+ Priority.newInstance(1), 0, ImmutableSet.of("mapper", "reducer"),
+ "mapper", "reducer");
+
+ checkPendingResource(rm, "a1", 8 * GB, null);
+ checkPendingResource(rm, "a", 8 * GB, null);
+ checkPendingResource(rm, "root", 8 * GB, null);
+
+ // am2 asks for 8 * 1GB container for no label
+ am2.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(1), "*",
+ Resources.createResource(1 * GB), 8)), null);
+
+ checkPendingResource(rm, "a1", 8 * GB, null);
+ checkPendingResource(rm, "a", 8 * GB, null);
+ checkPendingResource(rm, "b1", 8 * GB, null);
+ checkPendingResource(rm, "b", 8 * GB, null);
+ // root = a + b
+ checkPendingResource(rm, "root", 16 * GB, null);
+
+ // am2 asks for 8 * 1GB container in another priority for no label
+ am2.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(2), "*",
+ Resources.createResource(1 * GB), 8)), null);
+
+ checkPendingResource(rm, "a1", 8 * GB, null);
+ checkPendingResource(rm, "a", 8 * GB, null);
+ checkPendingResource(rm, "b1", 16 * GB, null);
+ checkPendingResource(rm, "b", 16 * GB, null);
+ // root = a + b
+ checkPendingResource(rm, "root", 24 * GB, null);
+
+ // am1 asks 4 GB resource instead of 8 * GB for priority=1
+ // am1 asks for 8 * 1GB container for no label
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(4, Resource.newInstance(1 * GB, 1)),
+ Priority.newInstance(1), 0, ImmutableSet.of("mapper", "reducer"),
+ "mapper", "reducer");
+
+ checkPendingResource(rm, "a1", 4 * GB, null);
+ checkPendingResource(rm, "a", 4 * GB, null);
+ checkPendingResource(rm, "b1", 16 * GB, null);
+ checkPendingResource(rm, "b", 16 * GB, null);
+ // root = a + b
+ checkPendingResource(rm, "root", 20 * GB, null);
+
+ // am1 asks 8 * GB resource which label=x
+ am1.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(2), "*",
+ Resources.createResource(8 * GB), 1, true, "x")), null);
+
+ checkPendingResource(rm, "a1", 4 * GB, null);
+ checkPendingResource(rm, "a", 4 * GB, null);
+ checkPendingResource(rm, "a1", 8 * GB, "x");
+ checkPendingResource(rm, "a", 8 * GB, "x");
+ checkPendingResource(rm, "b1", 16 * GB, null);
+ checkPendingResource(rm, "b", 16 * GB, null);
+ // root = a + b
+ checkPendingResource(rm, "root", 20 * GB, null);
+ checkPendingResource(rm, "root", 8 * GB, "x");
+
+ // complete am1/am2, pending resource should be 0 now
+ AppAttemptRemovedSchedulerEvent appRemovedEvent =
+ new AppAttemptRemovedSchedulerEvent(am2.getApplicationAttemptId(),
+ RMAppAttemptState.FINISHED, false);
+ rm.getResourceScheduler().handle(appRemovedEvent);
+ appRemovedEvent = new AppAttemptRemovedSchedulerEvent(
+ am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
+ rm.getResourceScheduler().handle(appRemovedEvent);
+
+ checkPendingResource(rm, "a1", 0 * GB, null);
+ checkPendingResource(rm, "a", 0 * GB, null);
+ checkPendingResource(rm, "a1", 0 * GB, "x");
+ checkPendingResource(rm, "a", 0 * GB, "x");
+ checkPendingResource(rm, "b1", 0 * GB, null);
+ checkPendingResource(rm, "b", 0 * GB, null);
+ checkPendingResource(rm, "root", 0 * GB, null);
+ checkPendingResource(rm, "root", 0 * GB, "x");
+ }
+
+ @Test
+ public void testNodePartitionPendingResourceUpdate() throws Exception {
+ Configuration conf = TestUtils.getConfigurationWithQueueLabels(
+ new Configuration(false));
+ conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+
+ final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+ mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
+ mgr.addLabelsToNode(
+ ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ MockRM rm = new MockRM(conf) {
+ protected RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm.start();
+ MockNM nm1 = // label = x
+ new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
+ nm1.registerNode();
+
+ MockNM nm2 = // label = ""
+ new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService());
+ nm2.registerNode();
+
+ // Launch app1 in queue=a1
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+
+ // Launch app2 in queue=b1
+ RMApp app2 = rm.submitApp(8 * GB, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+ // am1 asks for 8 * 1GB container for "x"
+ am1.allocateIntraAppAntiAffinity("x",
+ ResourceSizing.newInstance(8, Resource.newInstance(1 * GB, 1)),
+ Priority.newInstance(1), 0, "mapper", "reducer");
+
+ checkPendingResource(rm, "a1", 8 * GB, "x");
+ checkPendingResource(rm, "a", 8 * GB, "x");
+ checkPendingResource(rm, "root", 8 * GB, "x");
+
+ // am2 asks for 8 * 1GB container for "x"
+ am2.allocateIntraAppAntiAffinity("x",
+ ResourceSizing.newInstance(8, Resource.newInstance(1 * GB, 1)),
+ Priority.newInstance(1), 0, "mapper", "reducer");
+
+ checkPendingResource(rm, "a1", 8 * GB, "x");
+ checkPendingResource(rm, "a", 8 * GB, "x");
+ checkPendingResource(rm, "b1", 8 * GB, "x");
+ checkPendingResource(rm, "b", 8 * GB, "x");
+ // root = a + b
+ checkPendingResource(rm, "root", 16 * GB, "x");
+
+ // am1 asks for 6 * 1GB container for "x" in another priority
+ am1.allocateIntraAppAntiAffinity("x",
+ ResourceSizing.newInstance(6, Resource.newInstance(1 * GB, 1)),
+ Priority.newInstance(2), 0, "mapper", "reducer");
+
+ checkPendingResource(rm, "a1", 14 * GB, "x");
+ checkPendingResource(rm, "a", 14 * GB, "x");
+ checkPendingResource(rm, "b1", 8 * GB, "x");
+ checkPendingResource(rm, "b", 8 * GB, "x");
+ // root = a + b
+ checkPendingResource(rm, "root", 22 * GB, "x");
+
+ // am1 asks for 4 * 1GB container for "x" in priority=1, which should
+ // override 8 * 1GB
+ am1.allocateIntraAppAntiAffinity("x",
+ ResourceSizing.newInstance(4, Resource.newInstance(1 * GB, 1)),
+ Priority.newInstance(1), 0, "mapper", "reducer");
+
+ checkPendingResource(rm, "a1", 10 * GB, "x");
+ checkPendingResource(rm, "a", 10 * GB, "x");
+ checkPendingResource(rm, "b1", 8 * GB, "x");
+ checkPendingResource(rm, "b", 8 * GB, "x");
+ // root = a + b
+ checkPendingResource(rm, "root", 18 * GB, "x");
+
+ // complete am1/am2, pending resource should be 0 now
+ AppAttemptRemovedSchedulerEvent appRemovedEvent =
+ new AppAttemptRemovedSchedulerEvent(am2.getApplicationAttemptId(),
+ RMAppAttemptState.FINISHED, false);
+ rm.getResourceScheduler().handle(appRemovedEvent);
+ appRemovedEvent = new AppAttemptRemovedSchedulerEvent(
+ am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
+ rm.getResourceScheduler().handle(appRemovedEvent);
+
+ checkPendingResource(rm, "a1", 0 * GB, null);
+ checkPendingResource(rm, "a", 0 * GB, null);
+ checkPendingResource(rm, "a1", 0 * GB, "x");
+ checkPendingResource(rm, "a", 0 * GB, "x");
+ checkPendingResource(rm, "b1", 0 * GB, null);
+ checkPendingResource(rm, "b", 0 * GB, null);
+ checkPendingResource(rm, "root", 0 * GB, null);
+ checkPendingResource(rm, "root", 0 * GB, "x");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/390a6a45/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
index d2e28be..a800bef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
@@ -132,7 +132,7 @@ public class TestIncreaseAllocationExpirer {
Assert.assertEquals(RMContainerState.RUNNING,
rm1.getResourceScheduler().getRMContainer(containerId2).getState());
// Verify container size is 3G
- Assert.assertEquals(
+ Assert.assertEquals(
3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
.getAllocatedResource().getMemorySize());
// Verify total resource usage
http://git-wip-us.apache.org/repos/asf/hadoop/blob/390a6a45/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
new file mode 100644
index 0000000..0a44a1e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSchedulingRequestContainerAllocation {
+ private final int GB = 1024;
+
+ private YarnConfiguration conf;
+
+ RMNodeLabelsManager mgr;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+ }
+
+ @Test
+ public void testIntraAppAntiAffinity() throws Exception {
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
+ new Configuration());
+ csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
+ true);
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ // 4 NMs.
+ MockNM[] nms = new MockNM[4];
+ RMNode[] rmNodes = new RMNode[4];
+ for (int i = 0; i < 4; i++) {
+ nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
+ rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
+ }
+
+ // app1 -> c
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
+
+ // app1 asks for 10 anti-affinity containers for the same app. It should
+ // only get 4 containers allocated because we only have 4 nodes.
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(10, Resource.newInstance(1024, 1)),
+ Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 4; j++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+ }
+ }
+
+ // App1 should get 5 containers allocated (1 AM + 1 node each).
+ FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(5, schedulerApp.getLiveContainers().size());
+
+ // Similarly, app1 asks 10 anti-affinity containers at different priority,
+ // it should be satisfied as well.
+ // app1 asks for 10 anti-affinity containers for the same app. It should
+ // only get 4 containers allocated because we only have 4 nodes.
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
+ Priority.newInstance(2), 1L, ImmutableSet.of("reducer"), "reducer");
+
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 4; j++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+ }
+ }
+
+ // App1 should get 9 containers allocated (1 AM + 8 containers).
+ Assert.assertEquals(9, schedulerApp.getLiveContainers().size());
+
+ // Test anti-affinity to both of "mapper/reducer", we should only get no
+ // container allocated
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
+ Priority.newInstance(3), 1L, ImmutableSet.of("reducer2"), "mapper");
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 4; j++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+ }
+ }
+
+ // App1 should get 10 containers allocated (1 AM + 9 containers).
+ Assert.assertEquals(9, schedulerApp.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ @Test
+ public void testIntraAppAntiAffinityWithMultipleTags() throws Exception {
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
+ new Configuration());
+ csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
+ true);
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ // 4 NMs.
+ MockNM[] nms = new MockNM[4];
+ RMNode[] rmNodes = new RMNode[4];
+ for (int i = 0; i < 4; i++) {
+ nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
+ rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
+ }
+
+ // app1 -> c
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
+
+ // app1 asks for 2 anti-affinity containers for the same app.
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(2, Resource.newInstance(1024, 1)),
+ Priority.newInstance(1), 1L, ImmutableSet.of("tag_1_1", "tag_1_2"),
+ "tag_1_1", "tag_1_2");
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 4; j++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+ }
+ }
+
+ // App1 should get 3 containers allocated (1 AM + 2 task).
+ FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(3, schedulerApp.getLiveContainers().size());
+
+ // app1 asks for 1 anti-affinity containers for the same app. anti-affinity
+ // to tag_1_1/tag_1_2. With allocation_tag = tag_2_1/tag_2_2
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
+ Priority.newInstance(2), 1L, ImmutableSet.of("tag_2_1", "tag_2_2"),
+ "tag_1_1", "tag_1_2");
+
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 4; j++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+ }
+ }
+
+ // App1 should get 4 containers allocated (1 AM + 2 task (first request) +
+ // 1 task (2nd request).
+ Assert.assertEquals(4, schedulerApp.getLiveContainers().size());
+
+ // app1 asks for 10 anti-affinity containers for the same app. anti-affinity
+ // to tag_1_1/tag_1_2/tag_2_1/tag_2_2. With allocation_tag = tag_3
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
+ Priority.newInstance(3), 1L, ImmutableSet.of("tag_3"),
+ "tag_1_1", "tag_1_2", "tag_2_1", "tag_2_2");
+
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 4; j++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+ }
+ }
+
+ // App1 should get 1 more containers allocated
+ // 1 AM + 2 task (first request) + 1 task (2nd request) +
+ // 1 task (3rd request)
+ Assert.assertEquals(5, schedulerApp.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ @Test
+ public void testSchedulingRequestDisabledByDefault() throws Exception {
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
+ new Configuration());
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ // 4 NMs.
+ MockNM[] nms = new MockNM[4];
+ RMNode[] rmNodes = new RMNode[4];
+ for (int i = 0; i < 4; i++) {
+ nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
+ rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
+ }
+
+ // app1 -> c
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
+
+ // app1 asks for 2 anti-affinity containers for the same app.
+ boolean caughtException = false;
+ try {
+ // Since feature is disabled by default, we should expect exception.
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(2, Resource.newInstance(1024, 1)),
+ Priority.newInstance(1), 1L, ImmutableSet.of("tag_1_1", "tag_1_2"),
+ "tag_1_1", "tag_1_2");
+ } catch (Exception e) {
+ caughtException = true;
+ }
+ Assert.assertTrue(caughtException);
+ rm1.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/390a6a45/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
new file mode 100644
index 0000000..c7f13cd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSchedulingRequestContainerAllocationAsync {
+ private final int GB = 1024;
+
+ private YarnConfiguration conf;
+
+ RMNodeLabelsManager mgr;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+ }
+
+ private void testIntraAppAntiAffinityAsync(int numThreads) throws Exception {
+ Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
+ new Configuration());
+ csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
+ true);
+ csConf.setInt(
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
+ numThreads);
+ csConf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ + ".scheduling-interval-ms", 0);
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ // 200 NMs.
+ int nNMs = 200;
+ MockNM[] nms = new MockNM[nNMs];
+ RMNode[] rmNodes = new RMNode[nNMs];
+ for (int i = 0; i < nNMs; i++) {
+ nms[i] = rm1.registerNode("127.0.0." + i + ":1234", 10 * GB);
+ rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
+ }
+
+ // app1 -> c
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
+
+ // app1 asks for 10 anti-affinity containers for the same app. It should
+ // only get 4 containers allocated because we only have 4 nodes.
+ am1.allocateIntraAppAntiAffinity(
+ ResourceSizing.newInstance(1000, Resource.newInstance(1024, 1)),
+ Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < nNMs; j++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+ }
+ }
+
+ // App1 should get #NM + 1 containers allocated (1 node each + 1 AM).
+ FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(nNMs + 1, schedulerApp.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ @Test(timeout = 300000)
+ public void testSingleThreadAsyncContainerAllocation() throws Exception {
+ testIntraAppAntiAffinityAsync(1);
+ }
+
+ @Test(timeout = 300000)
+ public void testTwoThreadsAsyncContainerAllocation() throws Exception {
+ testIntraAppAntiAffinityAsync(2);
+ }
+
+ @Test(timeout = 300000)
+ public void testThreeThreadsAsyncContainerAllocation() throws Exception {
+ testIntraAppAntiAffinityAsync(3);
+ }
+
+ @Test(timeout = 300000)
+ public void testFourThreadsAsyncContainerAllocation() throws Exception {
+ testIntraAppAntiAffinityAsync(4);
+ }
+
+ @Test(timeout = 300000)
+ public void testFiveThreadsAsyncContainerAllocation() throws Exception {
+ testIntraAppAntiAffinityAsync(5);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/390a6a45/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index e8734cc..542ba3e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -275,6 +275,8 @@ public class TestUtils {
public static Configuration getConfigurationWithQueueLabels(Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
+ conf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
+ true);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
http://git-wip-us.apache.org/repos/asf/hadoop/blob/390a6a45/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
index f1d5663..7afe4ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
@@ -20,10 +20,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
-import java.util.List;
-
+import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -33,7 +33,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.collect.ImmutableSet;
+import java.util.List;
/**
* Test functionality of AllocationTagsManager.
@@ -54,7 +54,6 @@ public class TestAllocationTagsManager {
rmContext = rm.getRMContext();
}
-
@Test
public void testAllocationTagsManagerSimpleCases()
throws InvalidAllocationTagsQueryException {
@@ -141,30 +140,31 @@ public class TestAllocationTagsManager {
// Get Node Cardinality of app1 on node2, with tag "<applicationId>", op=max
// (Expect this returns #containers from app1 on node2)
- Assert
- .assertEquals(2,
- atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(1),
- ImmutableSet.of(AllocationTagsNamespaces.APP_ID
- + TestUtils.getMockApplicationId(1).toString()),
- Long::max));
+ Assert.assertEquals(2,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+ TestUtils.getMockApplicationId(1), null, Long::max));
// Get Node Cardinality of app1 on node2, with empty tag set, op=max
Assert.assertEquals(2,
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+ TestUtils.getMockApplicationId(1), null, Long::max));
+
+ // Get Cardinality of app1 on node2, with empty tag set, op=max
+ Assert.assertEquals(2,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
// Get Node Cardinality of all apps on node2, with empty tag set, op=sum
- Assert.assertEquals(7, atm.getNodeCardinalityByOp(
+ Assert.assertEquals(4, atm.getNodeCardinalityByOp(
NodeId.fromString("host2:123"), null, ImmutableSet.of(), Long::sum));
// Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
- Assert.assertEquals(5,
+ Assert.assertEquals(3,
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
// Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
- Assert.assertEquals(2,
+ Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
@@ -296,7 +296,7 @@ public class TestAllocationTagsManager {
Assert.assertEquals(3, atm.getRackCardinality("rack0", null, "reducer"));
// Get Rack Cardinality of app_1 on rack0, with empty tag set, op=max
- Assert.assertEquals(2, atm.getRackCardinalityByOp("rack0",
+ Assert.assertEquals(1, atm.getRackCardinalityByOp("rack0",
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
// Get Rack Cardinality of app_1 on rack0, with empty tag set, op=min
http://git-wip-us.apache.org/repos/asf/hadoop/blob/390a6a45/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
index 7492233..8ad726e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
@@ -117,9 +117,9 @@ public class TestPlacementConstraintsUtil {
RMNode currentNode = nodeIterator.next();
FiCaSchedulerNode schedulerNode = TestUtils.getMockNode(
currentNode.getHostName(), currentNode.getRackName(), 123, 4 * GB);
- Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag1, schedulerNode, pcm, tm));
- Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag2, schedulerNode, pcm, tm));
}
/**
@@ -145,14 +145,14 @@ public class TestPlacementConstraintsUtil {
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m"));
// 'spark' placement on Node0 should now SUCCEED
- Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag1, schedulerNode0, pcm, tm));
// FAIL on the rest of the nodes
- Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag1, schedulerNode1, pcm, tm));
- Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag1, schedulerNode2, pcm, tm));
- Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag1, schedulerNode3, pcm, tm));
}
@@ -187,15 +187,15 @@ public class TestPlacementConstraintsUtil {
FiCaSchedulerNode schedulerNode3 = TestUtils
.getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
// 'zk' placement on Rack1 should now SUCCEED
- Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag2, schedulerNode0, pcm, tm));
- Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag2, schedulerNode1, pcm, tm));
// FAIL on the rest of the RACKs
- Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag2, schedulerNode2, pcm, tm));
- Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag2, schedulerNode3, pcm, tm));
}
@@ -230,14 +230,14 @@ public class TestPlacementConstraintsUtil {
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m"));
// 'spark' placement on Node0 should now FAIL
- Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag1, schedulerNode0, pcm, tm));
// SUCCEED on the rest of the nodes
- Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag1, schedulerNode1, pcm, tm));
- Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag1, schedulerNode2, pcm, tm));
- Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag1, schedulerNode3, pcm, tm));
}
@@ -273,15 +273,15 @@ public class TestPlacementConstraintsUtil {
.getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
// 'zk' placement on Rack1 should FAIL
- Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag2, schedulerNode0, pcm, tm));
- Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag2, schedulerNode1, pcm, tm));
// SUCCEED on the rest of the RACKs
- Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag2, schedulerNode2, pcm, tm));
- Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
sourceTag2, schedulerNode3, pcm, tm));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/390a6a45/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 5f29186..b998564 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -192,7 +192,7 @@ public class FairSchedulerTestBase {
resourceManager.getRMContext().getRMApps()
.put(id.getApplicationId(), rmApp);
- scheduler.allocate(id, ask, new ArrayList<ContainerId>(),
+ scheduler.allocate(id, ask, null, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
scheduler.update();
return id;
@@ -222,7 +222,7 @@ public class FairSchedulerTestBase {
resourceManager.getRMContext().getRMApps()
.put(id.getApplicationId(), rmApp);
- scheduler.allocate(id, ask, new ArrayList<ContainerId>(),
+ scheduler.allocate(id, ask, null, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
return id;
}
@@ -245,7 +245,7 @@ public class FairSchedulerTestBase {
ResourceRequest request, ApplicationAttemptId attId) {
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ask.add(request);
- scheduler.allocate(attId, ask, new ArrayList<ContainerId>(),
+ scheduler.allocate(attId, ask, null, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
scheduler.update();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/390a6a45/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
index 95dbaea..2512787 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
@@ -125,7 +125,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
List<ResourceRequest> ask = new ArrayList<>();
ask.add(createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true));
scheduler.allocate(
- appAttemptId, ask, new ArrayList<ContainerId>(),
+ appAttemptId, ask, null, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
@@ -163,8 +163,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
ResourceRequest request =
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
ask.add(request);
- scheduler.allocate(appAttemptId, ask,
- new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
+ scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
triggerSchedulingAttempt();
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
@@ -175,8 +174,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
ask.clear();
ask.add(request);
- scheduler.allocate(appAttemptId, ask,
- new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
+ scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
triggerSchedulingAttempt();
checkAppConsumption(app, Resources.createResource(2048,2));
@@ -373,7 +371,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
true);
ask1.add(request1);
ask1.add(request2);
- scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null,
+ scheduler.allocate(id11, ask1, null, new ArrayList<ContainerId>(), null, null,
NULL_UPDATE_REQUESTS);
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/390a6a45/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 77b6d04..d9c06a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -1280,7 +1280,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
List<ResourceRequest> asks = new ArrayList<ResourceRequest>();
asks.add(createResourceRequest(2048, node2.getRackName(), 1, 1, false));
- scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null,
+ scheduler.allocate(attemptId, asks, null, new ArrayList<ContainerId>(), null,
null, NULL_UPDATE_REQUESTS);
ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", "user1", 1);
@@ -2125,7 +2125,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ResourceRequest request1 = createResourceRequest(minReqSize * 2,
ResourceRequest.ANY, 1, 1, true);
ask1.add(request1);
- scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(),
+ scheduler.allocate(id11, ask1, null, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
// Second ask, queue2 requests 1 large.
@@ -2141,7 +2141,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ResourceRequest.ANY, 1, 1, false);
ask2.add(request2);
ask2.add(request3);
- scheduler.allocate(id21, ask2, new ArrayList<ContainerId>(),
+ scheduler.allocate(id21, ask2, null, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
// Third ask, queue2 requests 2 small (minReqSize).
@@ -2157,7 +2157,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ResourceRequest.ANY, 2, 2, true);
ask3.add(request4);
ask3.add(request5);
- scheduler.allocate(id22, ask3, new ArrayList<ContainerId>(),
+ scheduler.allocate(id22, ask3, null, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
scheduler.update();
@@ -2683,7 +2683,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Complete the first container so we can trigger allocation for app2
ContainerId containerId =
app1.getLiveContainers().iterator().next().getContainerId();
- scheduler.allocate(app1.getApplicationAttemptId(), new ArrayList<>(),
+ scheduler.allocate(app1.getApplicationAttemptId(), new ArrayList<>(), null,
Arrays.asList(containerId), null, null, NULL_UPDATE_REQUESTS);
// Trigger allocation for app2
@@ -2769,7 +2769,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true));
asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true));
- scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null,
+ scheduler.allocate(attemptId, asks, null, new ArrayList<ContainerId>(), null,
null, NULL_UPDATE_REQUESTS);
// node 1 checks in
@@ -3216,7 +3216,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
createResourceRequest(1024, node1.getHostName(), 1, 0, true),
createResourceRequest(1024, "rack1", 1, 0, true),
createResourceRequest(1024, ResourceRequest.ANY, 1, 1, true));
- scheduler.allocate(attId1, update, new ArrayList<ContainerId>(),
+ scheduler.allocate(attId1, update, null, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
// then node2 should get the container
@@ -4432,7 +4432,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
createResourceRequest(1024, 8, ResourceRequest.ANY, 1, 1, true);
ask1.add(request1);
- scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null,
+ scheduler.allocate(id11, ask1, null, new ArrayList<ContainerId>(), null,
null, NULL_UPDATE_REQUESTS);
String hostName = "127.0.0.1";
@@ -4508,11 +4508,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Verify the blacklist can be updated independent of requesting containers
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
- Collections.<ContainerId>emptyList(),
+ null, Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
assertTrue(app.isPlaceBlacklisted(host));
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
- Collections.<ContainerId>emptyList(), null,
+ null, Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host), NULL_UPDATE_REQUESTS);
assertFalse(scheduler.getSchedulerApp(appAttemptId)
.isPlaceBlacklisted(host));
@@ -4521,8 +4521,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
createResourceRequest(GB, node.getHostName(), 1, 0, true));
// Verify a container does not actually get placed on the blacklisted host
- scheduler.allocate(appAttemptId, update,
- Collections.<ContainerId>emptyList(),
+ scheduler.allocate(appAttemptId, update, null, Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
assertTrue(app.isPlaceBlacklisted(host));
scheduler.update();
@@ -4531,8 +4530,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
.getLiveContainers().size());
// Verify a container gets placed on the empty blacklist
- scheduler.allocate(appAttemptId, update,
- Collections.<ContainerId>emptyList(), null,
+ scheduler.allocate(appAttemptId, update, null, Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host), NULL_UPDATE_REQUESTS);
assertFalse(app.isPlaceBlacklisted(host));
createSchedulingRequest(GB, "root.default", "user", 1);
@@ -5391,8 +5389,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ask1.add(request3);
// Perform allocation
- scheduler.allocate(appAttemptId, ask1, new ArrayList<ContainerId>(), null,
- null, NULL_UPDATE_REQUESTS);
+ scheduler.allocate(appAttemptId, ask1, null, new ArrayList<ContainerId>(),
+ null, null, NULL_UPDATE_REQUESTS);
scheduler.update();
scheduler.handle(new NodeUpdateSchedulerEvent(node));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/390a6a45/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index db749ac..8814c0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -281,7 +281,7 @@ public class TestFifoScheduler {
ask.add(nodeLocal);
ask.add(rackLocal);
ask.add(any);
- scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(),
+ scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
@@ -378,7 +378,7 @@ public class TestFifoScheduler {
ask.add(nodeLocal);
ask.add(rackLocal);
ask.add(any);
- scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(),
+ scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
// Before the node update event, there are one local request
@@ -954,7 +954,7 @@ public class TestFifoScheduler {
ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1,
RMNodeLabelsManager.NO_LABEL));
- fs.allocate(appAttemptId1, ask1, emptyId,
+ fs.allocate(appAttemptId1, ask1, null, emptyId,
Collections.singletonList(host_1_0), null, NULL_UPDATE_REQUESTS);
// Trigger container assignment
@@ -963,7 +963,7 @@ public class TestFifoScheduler {
// Get the allocation for the application and verify no allocation on
// blacklist node
Allocation allocation1 =
- fs.allocate(appAttemptId1, emptyAsk, emptyId,
+ fs.allocate(appAttemptId1, emptyAsk, null, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("allocation1", 0, allocation1.getContainers().size());
@@ -971,7 +971,7 @@ public class TestFifoScheduler {
// verify host_1_1 can get allocated as not in blacklist
fs.handle(new NodeUpdateSchedulerEvent(n4));
Allocation allocation2 =
- fs.allocate(appAttemptId1, emptyAsk, emptyId,
+ fs.allocate(appAttemptId1, emptyAsk, null, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("allocation2", 1, allocation2.getContainers().size());
List<Container> containerList = allocation2.getContainers();
@@ -986,33 +986,33 @@ public class TestFifoScheduler {
// be assigned
ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
- fs.allocate(appAttemptId1, ask2, emptyId,
+ fs.allocate(appAttemptId1, ask2, null, emptyId,
Collections.singletonList("rack0"), null, NULL_UPDATE_REQUESTS);
// verify n1 is not qualified to be allocated
fs.handle(new NodeUpdateSchedulerEvent(n1));
Allocation allocation3 =
- fs.allocate(appAttemptId1, emptyAsk, emptyId,
+ fs.allocate(appAttemptId1, emptyAsk, null, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("allocation3", 0, allocation3.getContainers().size());
// verify n2 is not qualified to be allocated
fs.handle(new NodeUpdateSchedulerEvent(n2));
Allocation allocation4 =
- fs.allocate(appAttemptId1, emptyAsk, emptyId,
+ fs.allocate(appAttemptId1, emptyAsk, null, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("allocation4", 0, allocation4.getContainers().size());
// verify n3 is not qualified to be allocated
fs.handle(new NodeUpdateSchedulerEvent(n3));
Allocation allocation5 =
- fs.allocate(appAttemptId1, emptyAsk, emptyId,
+ fs.allocate(appAttemptId1, emptyAsk, null, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("allocation5", 0, allocation5.getContainers().size());
fs.handle(new NodeUpdateSchedulerEvent(n4));
Allocation allocation6 =
- fs.allocate(appAttemptId1, emptyAsk, emptyId,
+ fs.allocate(appAttemptId1, emptyAsk, null, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("allocation6", 1, allocation6.getContainers().size());
@@ -1072,14 +1072,14 @@ public class TestFifoScheduler {
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
- fs.allocate(appAttemptId1, ask1, emptyId,
+ fs.allocate(appAttemptId1, ask1, null, emptyId,
null, null, NULL_UPDATE_REQUESTS);
// Ask for a 2 GB container for app 2
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
ResourceRequest.ANY, BuilderUtils.newResource(2 * GB, 1), 1));
- fs.allocate(appAttemptId2, ask2, emptyId,
+ fs.allocate(appAttemptId2, ask2, null, emptyId,
null, null, NULL_UPDATE_REQUESTS);
// Trigger container assignment
@@ -1087,13 +1087,13 @@ public class TestFifoScheduler {
// Get the allocation for the applications and verify headroom
Allocation allocation1 =
- fs.allocate(appAttemptId1, emptyAsk, emptyId,
+ fs.allocate(appAttemptId1, emptyAsk, null, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("Allocation headroom", 1 * GB, allocation1
.getResourceLimit().getMemorySize());
Allocation allocation2 =
- fs.allocate(appAttemptId2, emptyAsk, emptyId,
+ fs.allocate(appAttemptId2, emptyAsk, null, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("Allocation headroom", 1 * GB, allocation2
.getResourceLimit().getMemorySize());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/390a6a45/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java
new file mode 100644
index 0000000..479d2c1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.function.LongBinaryOperator;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test behaviors of single constraint app placement allocator.
+ */
+public class TestSingleConstraintAppPlacementAllocator {
+ private AppSchedulingInfo appSchedulingInfo;
+ private AllocationTagsManager spyAllocationTagsManager;
+ private RMContext rmContext;
+ private SchedulerRequestKey schedulerRequestKey;
+ private SingleConstraintAppPlacementAllocator allocator;
+
+ @Before
+ public void setup() throws Exception {
+ // stub app scheduling info.
+ appSchedulingInfo = mock(AppSchedulingInfo.class);
+ when(appSchedulingInfo.getApplicationId()).thenReturn(
+ TestUtils.getMockApplicationId(1));
+ when(appSchedulingInfo.getApplicationAttemptId()).thenReturn(
+ TestUtils.getMockApplicationAttemptId(1, 1));
+
+ // stub RMContext
+ rmContext = TestUtils.getMockRMContext();
+
+ // Create allocation tags manager
+ AllocationTagsManager allocationTagsManager = new AllocationTagsManager(
+ rmContext);
+ spyAllocationTagsManager = spy(allocationTagsManager);
+ schedulerRequestKey = new SchedulerRequestKey(Priority.newInstance(1), 2L,
+ TestUtils.getMockContainerId(1, 1));
+ rmContext.setAllocationTagsManager(spyAllocationTagsManager);
+
+ // Create allocator
+ allocator = new SingleConstraintAppPlacementAllocator();
+ allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+ }
+
+ private void assertValidSchedulingRequest(
+ SchedulingRequest schedulingRequest) {
+ // Create allocator to avoid fields polluted by previous runs
+ allocator = new SingleConstraintAppPlacementAllocator();
+ allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+ }
+
+ private void assertInvalidSchedulingRequest(
+ SchedulingRequest schedulingRequest, boolean recreateAllocator) {
+ try {
+ // Create allocator
+ if (recreateAllocator) {
+ allocator = new SingleConstraintAppPlacementAllocator();
+ allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+ }
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+ } catch (SchedulerInvalidResoureRequestException e) {
+ // Expected
+ return;
+ }
+ Assert.fail(
+ "Expect failure for schedulingRequest=" + schedulingRequest.toString());
+ }
+
+ @Test
+ public void testSchedulingRequestValidation() {
+ // Valid
+ assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build());
+ Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
+ allocator.getTargetAllocationTags());
+ Assert.assertEquals("", allocator.getTargetNodePartition());
+
+ // Valid (with partition)
+ assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition("x"))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build());
+ Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
+ allocator.getTargetAllocationTags());
+ Assert.assertEquals("x", allocator.getTargetNodePartition());
+
+ // Valid (without specifying node partition)
+ assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer")).build())
+ .resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build());
+ Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
+ allocator.getTargetAllocationTags());
+ Assert.assertEquals("", allocator.getTargetNodePartition());
+
+ // Valid (with application Id target)
+ assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer")).build())
+ .resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build());
+ // Allocation tags should not include application Id
+ Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
+ allocator.getTargetAllocationTags());
+ Assert.assertEquals("", allocator.getTargetNodePartition());
+
+ // Invalid (without sizing)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer")).build())
+ .build(), true);
+
+ // Invalid (without target tags)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1).build())
+ .build(), true);
+
+ // Invalid (with multiple allocation tags expression specified)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper"),
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build(), true);
+
+ // Invalid (with multiple node partition target expression specified)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper"),
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp(""),
+ PlacementConstraints.PlacementTargets.nodePartition("x"))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build(), true);
+
+ // Invalid (not anti-affinity cardinality)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 1, 2,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build(), true);
+
+ // Invalid (not anti-affinity cardinality)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 2,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build(), true);
+
+ // Invalid (not NODE scope)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.RACK, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build(), true);
+
+ // Invalid (not GUARANTEED)
+ assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build(), true);
+ }
+
+ @Test
+ public void testSchedulingRequestUpdate() {
+ SchedulingRequest schedulingRequest =
+ SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build();
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+
+ // Update allocator with exactly same scheduling request, should succeeded.
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+
+ // Update allocator with scheduling request different at #allocations,
+ // should succeeded.
+ schedulingRequest.getResourceSizing().setNumAllocations(10);
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+
+ // Update allocator with scheduling request different at resource,
+ // should failed.
+ schedulingRequest.getResourceSizing().setResources(
+ Resource.newInstance(2048, 1));
+ assertInvalidSchedulingRequest(schedulingRequest, false);
+
+ // Update allocator with a different placement target (allocator tag),
+ // should failed
+ schedulingRequest = SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build();
+ assertInvalidSchedulingRequest(schedulingRequest, false);
+
+ // Update allocator with recover == true
+ int existingNumAllocations =
+ allocator.getSchedulingRequest().getResourceSizing()
+ .getNumAllocations();
+ schedulingRequest = SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build();
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, true);
+ Assert.assertEquals(existingNumAllocations + 1,
+ allocator.getSchedulingRequest().getResourceSizing()
+ .getNumAllocations());
+ }
+
+ @Test
+ public void testFunctionality() throws InvalidAllocationTagsQueryException {
+ SchedulingRequest schedulingRequest =
+ SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition(""))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build();
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+ allocator.canAllocate(NodeType.NODE_LOCAL,
+ TestUtils.getMockNode("host1", "/rack1", 123, 1024));
+ verify(spyAllocationTagsManager, Mockito.times(1)).getNodeCardinalityByOp(
+ eq(NodeId.fromString("host1:123")), eq(TestUtils.getMockApplicationId(1)),
+ eq(ImmutableSet.of("mapper", "reducer")),
+ any(LongBinaryOperator.class));
+
+ allocator = new SingleConstraintAppPlacementAllocator();
+ allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+ // Valid (with partition)
+ schedulingRequest = SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(10L).priority(Priority.newInstance(1))
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp("mapper", "reducer"),
+ PlacementConstraints.PlacementTargets.nodePartition("x"))
+ .build()).resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+ .build();
+ allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+ allocator.canAllocate(NodeType.NODE_LOCAL,
+ TestUtils.getMockNode("host1", "/rack1", 123, 1024));
+ verify(spyAllocationTagsManager, Mockito.atLeast(1)).getNodeCardinalityByOp(
+ eq(NodeId.fromString("host1:123")),
+ eq(TestUtils.getMockApplicationId(1)), eq(ImmutableSet
+ .of("mapper", "reducer")), any(LongBinaryOperator.class));
+
+ SchedulerNode node1 = mock(SchedulerNode.class);
+ when(node1.getPartition()).thenReturn("x");
+ when(node1.getNodeID()).thenReturn(NodeId.fromString("host1:123"));
+
+ Assert.assertTrue(allocator
+ .precheckNode(node1, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
+
+ SchedulerNode node2 = mock(SchedulerNode.class);
+ when(node1.getPartition()).thenReturn("");
+ when(node1.getNodeID()).thenReturn(NodeId.fromString("host2:123"));
+ Assert.assertFalse(allocator
+ .precheckNode(node2, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org