You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/08/25 15:49:25 UTC

[08/50] [abbrv] hadoop git commit: YARN-7494. Add muti-node lookup mechanism and pluggable nodes sorting policies to optimize placement decision. Contributed by Sunil Govindan.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java
new file mode 100644
index 0000000..d765af8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+import java.util.Comparator;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+/**
+ * <p>
+ * This class has the following functionality:
+ *
+ * <p>
+ * ResourceUsageMultiNodeLookupPolicy holds sorted nodes list based on the
+ * resource usage of nodes at given time.
+ * </p>
+ */
+public class ResourceUsageMultiNodeLookupPolicy<N extends SchedulerNode>
+    implements MultiNodeLookupPolicy<N> {
+
+  protected Map<String, Set<N>> nodesPerPartition = new ConcurrentHashMap<>();
+  protected Comparator<N> comparator;
+
+  public ResourceUsageMultiNodeLookupPolicy() {
+    this.comparator = new Comparator<N>() {
+      @Override
+      public int compare(N o1, N o2) {
+        int allocatedDiff = o1.getAllocatedResource()
+            .compareTo(o2.getAllocatedResource());
+        if (allocatedDiff == 0) {
+          return o1.getNodeID().compareTo(o2.getNodeID());
+        }
+        return allocatedDiff;
+      }
+    };
+  }
+
+  @Override
+  public Iterator<N> getPreferredNodeIterator(Collection<N> nodes,
+      String partition) {
+    return getNodesPerPartition(partition).iterator();
+  }
+
+  @Override
+  public void addAndRefreshNodesSet(Collection<N> nodes,
+      String partition) {
+    Set<N> nodeList = new ConcurrentSkipListSet<N>(comparator);
+    nodeList.addAll(nodes);
+    nodesPerPartition.put(partition, Collections.unmodifiableSet(nodeList));
+  }
+
+  @Override
+  public Set<N> getNodesPerPartition(String partition) {
+    return nodesPerPartition.getOrDefault(partition, Collections.emptySet());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index eef86a4..09d3327 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -295,6 +296,8 @@ public class ReservationSystemTestUtil {
         });
 
     mockRmContext.setNodeLabelManager(nlm);
+    mockRmContext
+        .setMultiNodeSortingManager(mock(MultiNodeSortingManager.class));
     return mockRmContext;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
index b7b0eb7..df8309b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@@ -118,7 +119,7 @@ public class TestAppSchedulingInfo {
     doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
     AppSchedulingInfo  info = new AppSchedulingInfo(
         appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
-        new ResourceUsage(), new HashMap<>(), null);
+        new ResourceUsage(), new HashMap<>(), mock(RMContext.class));
     Assert.assertEquals(0, info.getSchedulerKeys().size());
 
     Priority pri1 = Priority.newInstance(1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
index 5cea3a2..60e25ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.junit.Assert;
 
 import java.util.Set;
@@ -76,4 +77,16 @@ public class CapacitySchedulerTestBase {
         .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
         .getMemorySize() > 0);
   }
+
+  protected void waitforNMRegistered(ResourceScheduler scheduler, int nodecount,
+      int timesec) throws InterruptedException {
+    long start = System.currentTimeMillis();
+    while (System.currentTimeMillis() - start < timesec * 1000) {
+      if (scheduler.getNumClusterNodes() < nodecount) {
+        Thread.sleep(100);
+      } else {
+        break;
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 8d948b5..e77d8e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -106,8 +106,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyConta
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 
-import org.apache.hadoop.yarn.server.resourcemanager.placement
-    .UserGroupMappingPlacementRule;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -172,7 +170,6 @@ import org.mockito.Mockito;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -4871,18 +4868,6 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
     return cs;
   }
 
-  private void waitforNMRegistered(ResourceScheduler scheduler, int nodecount,
-      int timesec) throws InterruptedException {
-    long start = System.currentTimeMillis();
-    while (System.currentTimeMillis() - start < timesec * 1000) {
-      if (scheduler.getNumClusterNodes() < nodecount) {
-        Thread.sleep(100);
-      } else {
-        break;
-      }
-    }
-  }
-
   @Test (timeout = 60000)
   public void testClearRequestsBeforeApplyTheProposal()
       throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
new file mode 100644
index 0000000..c90af94
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
@@ -0,0 +1,166 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSorter;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for Multi Node scheduling related tests.
+ */
+public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestCapacitySchedulerMultiNodes.class);
+  private CapacitySchedulerConfiguration conf;
+  private static final String POLICY_CLASS_NAME =
+      "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy";
+
+  @Before
+  public void setUp() {
+    CapacitySchedulerConfiguration config =
+        new CapacitySchedulerConfiguration();
+    config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+        DominantResourceCalculator.class.getName());
+    conf = new CapacitySchedulerConfiguration(config);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES,
+        "resource-based");
+    conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME,
+        "resource-based");
+    String policyName =
+        CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+            + ".resource-based" + ".class";
+    conf.set(policyName, POLICY_CLASS_NAME);
+    conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED,
+        true);
+    conf.setInt("yarn.scheduler.minimum-allocation-mb", 512);
+    conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1);
+  }
+
+  @Test
+  public void testMultiNodeSorterForScheduling() throws Exception {
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    rm.registerNode("127.0.0.1:1234", 10 * GB);
+    rm.registerNode("127.0.0.1:1235", 10 * GB);
+    rm.registerNode("127.0.0.1:1236", 10 * GB);
+    rm.registerNode("127.0.0.1:1237", 10 * GB);
+    ResourceScheduler scheduler = rm.getRMContext().getScheduler();
+    waitforNMRegistered(scheduler, 4, 5);
+    MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext()
+        .getMultiNodeSortingManager();
+    MultiNodeSorter<SchedulerNode> sorter = mns
+        .getMultiNodePolicy(POLICY_CLASS_NAME);
+    sorter.reSortClusterNodes();
+    Set<SchedulerNode> nodes = sorter.getMultiNodeLookupPolicy()
+        .getNodesPerPartition("");
+    Assert.assertEquals(4, nodes.size());
+    rm.stop();
+  }
+
+  @Test
+  public void testMultiNodeSorterForSchedulingWithOrdering() throws Exception {
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 10);
+    MockNM nm2 = rm.registerNode("127.0.0.2:1235", 10 * GB, 10);
+    MockNM nm3 = rm.registerNode("127.0.0.3:1236", 10 * GB, 10);
+    MockNM nm4 = rm.registerNode("127.0.0.4:1237", 10 * GB, 10);
+    ResourceScheduler scheduler = rm.getRMContext().getScheduler();
+    waitforNMRegistered(scheduler, 4, 5);
+
+    MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext()
+        .getMultiNodeSortingManager();
+    MultiNodeSorter<SchedulerNode> sorter = mns
+        .getMultiNodePolicy(POLICY_CLASS_NAME);
+    sorter.reSortClusterNodes();
+
+    Set<SchedulerNode> nodes = sorter.getMultiNodeLookupPolicy()
+        .getNodesPerPartition("");
+    Assert.assertEquals(4, nodes.size());
+
+    RMApp app1 = rm.submitApp(2048, "app-1", "user1", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+    SchedulerNodeReport reportNm1 =
+        rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+
+    // check node report
+    Assert.assertEquals(2 * GB, reportNm1.getUsedResource().getMemorySize());
+    Assert.assertEquals(8 * GB,
+        reportNm1.getAvailableResource().getMemorySize());
+
+    // Ideally thread will invoke this, but thread operates every 1sec.
+    // Hence forcefully recompute nodes.
+    sorter.reSortClusterNodes();
+
+    RMApp app2 = rm.submitApp(1024, "app-2", "user2", null, "default");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+    SchedulerNodeReport reportNm2 =
+        rm.getResourceScheduler().getNodeReport(nm2.getNodeId());
+
+    // check node report
+    Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize());
+    Assert.assertEquals(9 * GB,
+        reportNm2.getAvailableResource().getMemorySize());
+
+    // Ideally thread will invoke this, but thread operates every 1sec.
+    // Hence forcefully recompute nodes.
+    sorter.reSortClusterNodes();
+
+    // Node1 and Node2 are now having used resources. Hence ensure these 2 comes
+    // latter in the list.
+    nodes = sorter.getMultiNodeLookupPolicy()
+        .getNodesPerPartition("");
+    List<NodeId> currentNodes = new ArrayList<>();
+    currentNodes.add(nm3.getNodeId());
+    currentNodes.add(nm4.getNodeId());
+    currentNodes.add(nm2.getNodeId());
+    currentNodes.add(nm1.getNodeId());
+    Iterator<SchedulerNode> it = nodes.iterator();
+    SchedulerNode current;
+    int i = 0;
+    while (it.hasNext()) {
+      current = it.next();
+      Assert.assertEquals(current.getNodeID(), currentNodes.get(i++));
+    }
+    rm.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
index b4ebd15..e239191 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
@@ -817,4 +817,74 @@ public class TestCapacitySchedulerNodeLabelUpdate {
     }
     return memorySize;
   }
+
+  private long waitForNodeLabelSchedulerEventUpdate(MockRM rm, String partition,
+      long expectedNodeCount, long timeout) throws InterruptedException {
+    long start = System.currentTimeMillis();
+    long size = 0;
+    while (System.currentTimeMillis() - start < timeout) {
+      CapacityScheduler scheduler = (CapacityScheduler) rm
+          .getResourceScheduler();
+      size = scheduler.getNodeTracker().getNodesPerPartition(partition).size();
+      if (size == expectedNodeCount) {
+        return size;
+      }
+      Thread.sleep(100);
+    }
+    return size;
+  }
+
+  @Test
+  public void testNodeCountBasedOnNodeLabelsFromClusterNodeTracker()
+      throws Exception {
+    // set node -> label
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(
+        ImmutableSet.of("x", "y", "z"));
+
+    // set mapping:
+    // h1 -> x
+    // h2 -> y
+    mgr.addLabelsToNode(
+        ImmutableMap.of(NodeId.newInstance("h1", 1234), toSet("x")));
+    mgr.addLabelsToNode(
+        ImmutableMap.of(NodeId.newInstance("h2", 1234), toSet("x")));
+
+    // inject node label manager
+    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+    MockNM nm1 = rm.registerNode("h1:1234", 8000);
+    rm.registerNode("h2:1234", 8000);
+    rm.registerNode("h3:1234", 8000);
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    // Ensure that cluster node tracker is updated with correct set of node
+    // after Node registration.
+    Assert.assertEquals(2,
+        cs.getNodeTracker().getNodesPerPartition("x").size());
+    Assert.assertEquals(1, cs.getNodeTracker().getNodesPerPartition("").size());
+
+    rm.unRegisterNode(nm1);
+    rm.registerNode("h4:1234", 8000);
+
+    // Ensure that cluster node tracker is updated with correct set of node
+    // after new Node registration and old node label change.
+    Assert.assertEquals(1,
+        cs.getNodeTracker().getNodesPerPartition("x").size());
+    Assert.assertEquals(2, cs.getNodeTracker().getNodesPerPartition("").size());
+
+    mgr.replaceLabelsOnNode(
+        ImmutableMap.of(NodeId.newInstance("h2", 1234), toSet("")));
+
+    // Last node with label x is replaced by CLI or REST.
+    Assert.assertEquals(0,
+        waitForNodeLabelSchedulerEventUpdate(rm, "x", 0, 3000L));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org