You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/08/25 15:49:25 UTC
[08/50] [abbrv] hadoop git commit: YARN-7494. Add muti-node lookup
mechanism and pluggable nodes sorting policies to optimize placement
decision. Contributed by Sunil Govindan.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java
new file mode 100644
index 0000000..d765af8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+import java.util.Comparator;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+/**
+ * <p>
+ * This class has the following functionality:
+ *
+ * <p>
+ * ResourceUsageMultiNodeLookupPolicy holds sorted nodes list based on the
+ * resource usage of nodes at given time.
+ * </p>
+ */
+public class ResourceUsageMultiNodeLookupPolicy<N extends SchedulerNode>
+ implements MultiNodeLookupPolicy<N> {
+
+ protected Map<String, Set<N>> nodesPerPartition = new ConcurrentHashMap<>();
+ protected Comparator<N> comparator;
+
+ public ResourceUsageMultiNodeLookupPolicy() {
+ this.comparator = new Comparator<N>() {
+ @Override
+ public int compare(N o1, N o2) {
+ int allocatedDiff = o1.getAllocatedResource()
+ .compareTo(o2.getAllocatedResource());
+ if (allocatedDiff == 0) {
+ return o1.getNodeID().compareTo(o2.getNodeID());
+ }
+ return allocatedDiff;
+ }
+ };
+ }
+
+ @Override
+ public Iterator<N> getPreferredNodeIterator(Collection<N> nodes,
+ String partition) {
+ return getNodesPerPartition(partition).iterator();
+ }
+
+ @Override
+ public void addAndRefreshNodesSet(Collection<N> nodes,
+ String partition) {
+ Set<N> nodeList = new ConcurrentSkipListSet<N>(comparator);
+ nodeList.addAll(nodes);
+ nodesPerPartition.put(partition, Collections.unmodifiableSet(nodeList));
+ }
+
+ @Override
+ public Set<N> getNodesPerPartition(String partition) {
+ return nodesPerPartition.getOrDefault(partition, Collections.emptySet());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index eef86a4..09d3327 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -295,6 +296,8 @@ public class ReservationSystemTestUtil {
});
mockRmContext.setNodeLabelManager(nlm);
+ mockRmContext
+ .setMultiNodeSortingManager(mock(MultiNodeSortingManager.class));
return mockRmContext;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
index b7b0eb7..df8309b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@@ -118,7 +119,7 @@ public class TestAppSchedulingInfo {
doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
AppSchedulingInfo info = new AppSchedulingInfo(
appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
- new ResourceUsage(), new HashMap<>(), null);
+ new ResourceUsage(), new HashMap<>(), mock(RMContext.class));
Assert.assertEquals(0, info.getSchedulerKeys().size());
Priority pri1 = Priority.newInstance(1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
index 5cea3a2..60e25ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.collect.Sets;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.junit.Assert;
import java.util.Set;
@@ -76,4 +77,16 @@ public class CapacitySchedulerTestBase {
.getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
.getMemorySize() > 0);
}
+
+ protected void waitforNMRegistered(ResourceScheduler scheduler, int nodecount,
+ int timesec) throws InterruptedException {
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < timesec * 1000) {
+ if (scheduler.getNumClusterNodes() < nodecount) {
+ Thread.sleep(100);
+ } else {
+ break;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 8d948b5..e77d8e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -106,8 +106,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyConta
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.placement
- .UserGroupMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -172,7 +170,6 @@ import org.mockito.Mockito;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -4871,18 +4868,6 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
return cs;
}
- private void waitforNMRegistered(ResourceScheduler scheduler, int nodecount,
- int timesec) throws InterruptedException {
- long start = System.currentTimeMillis();
- while (System.currentTimeMillis() - start < timesec * 1000) {
- if (scheduler.getNumClusterNodes() < nodecount) {
- Thread.sleep(100);
- } else {
- break;
- }
- }
- }
-
@Test (timeout = 60000)
public void testClearRequestsBeforeApplyTheProposal()
throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
new file mode 100644
index 0000000..c90af94
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
@@ -0,0 +1,166 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSorter;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for Multi Node scheduling related tests.
+ */
+public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestCapacitySchedulerMultiNodes.class);
+ private CapacitySchedulerConfiguration conf;
+ private static final String POLICY_CLASS_NAME =
+ "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy";
+
+ @Before
+ public void setUp() {
+ CapacitySchedulerConfiguration config =
+ new CapacitySchedulerConfiguration();
+ config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+ DominantResourceCalculator.class.getName());
+ conf = new CapacitySchedulerConfiguration(config);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES,
+ "resource-based");
+ conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME,
+ "resource-based");
+ String policyName =
+ CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ + ".resource-based" + ".class";
+ conf.set(policyName, POLICY_CLASS_NAME);
+ conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED,
+ true);
+ conf.setInt("yarn.scheduler.minimum-allocation-mb", 512);
+ conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1);
+ }
+
+ @Test
+ public void testMultiNodeSorterForScheduling() throws Exception {
+ MockRM rm = new MockRM(conf);
+ rm.start();
+ rm.registerNode("127.0.0.1:1234", 10 * GB);
+ rm.registerNode("127.0.0.1:1235", 10 * GB);
+ rm.registerNode("127.0.0.1:1236", 10 * GB);
+ rm.registerNode("127.0.0.1:1237", 10 * GB);
+ ResourceScheduler scheduler = rm.getRMContext().getScheduler();
+ waitforNMRegistered(scheduler, 4, 5);
+ MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext()
+ .getMultiNodeSortingManager();
+ MultiNodeSorter<SchedulerNode> sorter = mns
+ .getMultiNodePolicy(POLICY_CLASS_NAME);
+ sorter.reSortClusterNodes();
+ Set<SchedulerNode> nodes = sorter.getMultiNodeLookupPolicy()
+ .getNodesPerPartition("");
+ Assert.assertEquals(4, nodes.size());
+ rm.stop();
+ }
+
+ @Test
+ public void testMultiNodeSorterForSchedulingWithOrdering() throws Exception {
+ MockRM rm = new MockRM(conf);
+ rm.start();
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 10);
+ MockNM nm2 = rm.registerNode("127.0.0.2:1235", 10 * GB, 10);
+ MockNM nm3 = rm.registerNode("127.0.0.3:1236", 10 * GB, 10);
+ MockNM nm4 = rm.registerNode("127.0.0.4:1237", 10 * GB, 10);
+ ResourceScheduler scheduler = rm.getRMContext().getScheduler();
+ waitforNMRegistered(scheduler, 4, 5);
+
+ MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext()
+ .getMultiNodeSortingManager();
+ MultiNodeSorter<SchedulerNode> sorter = mns
+ .getMultiNodePolicy(POLICY_CLASS_NAME);
+ sorter.reSortClusterNodes();
+
+ Set<SchedulerNode> nodes = sorter.getMultiNodeLookupPolicy()
+ .getNodesPerPartition("");
+ Assert.assertEquals(4, nodes.size());
+
+ RMApp app1 = rm.submitApp(2048, "app-1", "user1", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+ SchedulerNodeReport reportNm1 =
+ rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+
+ // check node report
+ Assert.assertEquals(2 * GB, reportNm1.getUsedResource().getMemorySize());
+ Assert.assertEquals(8 * GB,
+ reportNm1.getAvailableResource().getMemorySize());
+
+ // Ideally thread will invoke this, but thread operates every 1sec.
+ // Hence forcefully recompute nodes.
+ sorter.reSortClusterNodes();
+
+ RMApp app2 = rm.submitApp(1024, "app-2", "user2", null, "default");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+ SchedulerNodeReport reportNm2 =
+ rm.getResourceScheduler().getNodeReport(nm2.getNodeId());
+
+ // check node report
+ Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize());
+ Assert.assertEquals(9 * GB,
+ reportNm2.getAvailableResource().getMemorySize());
+
+ // Ideally thread will invoke this, but thread operates every 1sec.
+ // Hence forcefully recompute nodes.
+ sorter.reSortClusterNodes();
+
+ // Node1 and Node2 are now having used resources. Hence ensure these 2 comes
+ // latter in the list.
+ nodes = sorter.getMultiNodeLookupPolicy()
+ .getNodesPerPartition("");
+ List<NodeId> currentNodes = new ArrayList<>();
+ currentNodes.add(nm3.getNodeId());
+ currentNodes.add(nm4.getNodeId());
+ currentNodes.add(nm2.getNodeId());
+ currentNodes.add(nm1.getNodeId());
+ Iterator<SchedulerNode> it = nodes.iterator();
+ SchedulerNode current;
+ int i = 0;
+ while (it.hasNext()) {
+ current = it.next();
+ Assert.assertEquals(current.getNodeID(), currentNodes.get(i++));
+ }
+ rm.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
index b4ebd15..e239191 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
@@ -817,4 +817,74 @@ public class TestCapacitySchedulerNodeLabelUpdate {
}
return memorySize;
}
+
+ private long waitForNodeLabelSchedulerEventUpdate(MockRM rm, String partition,
+ long expectedNodeCount, long timeout) throws InterruptedException {
+ long start = System.currentTimeMillis();
+ long size = 0;
+ while (System.currentTimeMillis() - start < timeout) {
+ CapacityScheduler scheduler = (CapacityScheduler) rm
+ .getResourceScheduler();
+ size = scheduler.getNodeTracker().getNodesPerPartition(partition).size();
+ if (size == expectedNodeCount) {
+ return size;
+ }
+ Thread.sleep(100);
+ }
+ return size;
+ }
+
+ @Test
+ public void testNodeCountBasedOnNodeLabelsFromClusterNodeTracker()
+ throws Exception {
+ // set node -> label
+ mgr.addToCluserNodeLabelsWithDefaultExclusivity(
+ ImmutableSet.of("x", "y", "z"));
+
+ // set mapping:
+ // h1 -> x
+ // h2 -> y
+ mgr.addLabelsToNode(
+ ImmutableMap.of(NodeId.newInstance("h1", 1234), toSet("x")));
+ mgr.addLabelsToNode(
+ ImmutableMap.of(NodeId.newInstance("h2", 1234), toSet("x")));
+
+ // inject node label manager
+ MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm.getRMContext().setNodeLabelManager(mgr);
+ rm.start();
+ MockNM nm1 = rm.registerNode("h1:1234", 8000);
+ rm.registerNode("h2:1234", 8000);
+ rm.registerNode("h3:1234", 8000);
+
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+ // Ensure that cluster node tracker is updated with correct set of node
+ // after Node registration.
+ Assert.assertEquals(2,
+ cs.getNodeTracker().getNodesPerPartition("x").size());
+ Assert.assertEquals(1, cs.getNodeTracker().getNodesPerPartition("").size());
+
+ rm.unRegisterNode(nm1);
+ rm.registerNode("h4:1234", 8000);
+
+ // Ensure that cluster node tracker is updated with correct set of node
+ // after new Node registration and old node label change.
+ Assert.assertEquals(1,
+ cs.getNodeTracker().getNodesPerPartition("x").size());
+ Assert.assertEquals(2, cs.getNodeTracker().getNodesPerPartition("").size());
+
+ mgr.replaceLabelsOnNode(
+ ImmutableMap.of(NodeId.newInstance("h2", 1234), toSet("")));
+
+ // Last node with label x is replaced by CLI or REST.
+ Assert.assertEquals(0,
+ waitForNodeLabelSchedulerEventUpdate(rm, "x", 0, 3000L));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org