You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bt...@apache.org on 2022/04/13 13:57:32 UTC

[hadoop] branch trunk updated: YARN-11107. When NodeLabel is enabled for a YARN cluster, AM blacklist program does not work properly. Contributed by zhangxiping1

This is an automated email from the ASF dual-hosted git repository.

bteke pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3b46aae977e YARN-11107. When NodeLabel is enabled for a YARN cluster, AM blacklist program does not work properly. Contributed by zhangxiping1
3b46aae977e is described below

commit 3b46aae977e078cf7eb5e6bbbc55aca7cecee4c7
Author: 章锡平 <40...@users.noreply.github.com>
AuthorDate: Fri Apr 8 15:19:17 2022 +0800

    YARN-11107. When NodeLabel is enabled for a YARN cluster, AM blacklist program does not work properly. Contributed by zhangxiping1
---
 .../resourcemanager/DefaultAMSProcessor.java       |  16 +-
 .../TestApplicationMasterServiceCapacity.java      | 163 +++++++++++++++++++++
 2 files changed, 178 insertions(+), 1 deletion(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
index 12a3b21682a..3797a6ed3a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
@@ -351,7 +351,21 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
         ((AbstractYarnScheduler)getScheduler())
             .getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());
 
-    response.setNumClusterNodes(getScheduler().getNumClusterNodes());
+    String label="";
+    try {
+      label = rmContext.getScheduler()
+          .getQueueInfo(app.getQueue(), false, false)
+          .getDefaultNodeLabelExpression();
+    } catch (Exception e){
+      //Queue may not exist since it could be auto-created in case of
+      // dynamic queues
+    }
+
+    if (label == null || label.equals("")) {
+      response.setNumClusterNodes(getScheduler().getNumClusterNodes());
+    } else {
+      response.setNumClusterNodes(rmContext.getNodeLabelManager().getActiveNMCountPerLabel(label));
+    }
 
     // add collector address for this application
     if (timelineServiceV2Enabled) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceCapacity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceCapacity.java
index 182016a6cfa..ccbc375359a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceCapacity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceCapacity.java
@@ -16,36 +16,46 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
     .RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.collect.ImmutableMap;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSet;
 import static org.junit.Assert.fail;
 
 /**
@@ -208,4 +218,157 @@ public class TestApplicationMasterServiceCapacity extends
     Assert.assertEquals(appPriority2, response2.getApplicationPriority());
     rm.stop();
   }
+
+  @Test(timeout = 300000)
+  public void testGetNMNumInAllocatedResponseWithOutNodeLabel() throws Exception {
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    // Register node1 node2 node3 node4
+    MockNM nm1 = rm.registerNode("host1:1234", 6 * GB);
+    MockNM nm2 = rm.registerNode("host2:1234", 6 * GB);
+    MockNM nm3 = rm.registerNode("host3:1234", 6 * GB);
+
+    // Submit an application
+    MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder
+        .createWithMemory(2048, rm)
+        .build();
+    RMApp app1 = MockRMAppSubmitter.submit(rm, data);
+
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+    nm3.nodeHeartbeat(true);
+
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+    am1.registerAppAttempt();
+
+    AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
+    List<ContainerId> release = new ArrayList<ContainerId>();
+    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+    allocateRequest.setReleaseList(release);
+    allocateRequest.setAskList(ask);
+
+    AllocateResponse response1 = am1.allocate(allocateRequest);
+    Assert.assertEquals(3, response1.getNumClusterNodes());
+
+    rm.stop();
+  }
+
+  private Configuration getConfigurationWithQueueLabels(Configuration config) {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(config);
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    conf.setCapacity(A, 50);
+    conf.setMaximumCapacity(A, 100);
+    conf.setAccessibleNodeLabels(A, toSet("x"));
+    conf.setDefaultNodeLabelExpression(A, "x");
+    conf.setCapacityByLabel(A, "x", 100);
+
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    conf.setCapacity(B, 50);
+    conf.setMaximumCapacity(B, 100);
+    conf.setAccessibleNodeLabels(B, toSet("y"));
+    conf.setDefaultNodeLabelExpression(B, "y");
+    conf.setCapacityByLabel(B, "y", 100);
+
+    return conf;
+  }
+
+  @Test(timeout = 300000)
+  public void testGetNMNumInAllocatedResponseWithNodeLabel() throws Exception {
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
+      @Override
+      protected RMNodeLabelsManager createNodeLabelManager() {
+        RMNodeLabelsManager mgr = new RMNodeLabelsManager();
+        mgr.init(getConfig());
+        return mgr;
+      }
+    };
+
+    // add node label "x","y" and set node to label mapping
+    Set<String> clusterNodeLabels = new HashSet<String>();
+    clusterNodeLabels.add("x");
+    clusterNodeLabels.add("y");
+
+    RMNodeLabelsManager nodeLabelManager = rm.getRMContext().getNodeLabelManager();
+    nodeLabelManager.
+        addToCluserNodeLabelsWithDefaultExclusivity(clusterNodeLabels);
+
+    //has 3 nodes with node label "x",1 node with node label "y"
+    nodeLabelManager
+        .addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host1", 1234), toSet("x")));
+    nodeLabelManager
+        .addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host2", 1234), toSet("x")));
+    nodeLabelManager
+        .addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host3", 1234), toSet("x")));
+    nodeLabelManager
+        .addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host4", 1234), toSet("y")));
+    rm.start();
+
+    // Register node1 node2 node3 node4
+    MockNM nm1 = rm.registerNode("host1:1234", 6 * GB);
+    MockNM nm2 = rm.registerNode("host2:1234", 6 * GB);
+    MockNM nm3 = rm.registerNode("host3:1234", 6 * GB);
+    MockNM nm4 = rm.registerNode("host4:1234", 6 * GB);
+
+    // submit an application to queue root.a expression as "x"
+    MockRMAppSubmissionData data1 = MockRMAppSubmissionData.Builder
+        .createWithMemory(2048, rm)
+        .withAppName("someApp1")
+        .withUser("someUser")
+        .withQueue("root.a")
+        .build();
+    RMApp app1 = MockRMAppSubmitter.submit(rm, data1);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+    // submit an application to queue root.b expression as "y"
+    MockRMAppSubmissionData data2 = MockRMAppSubmissionData.Builder
+        .createWithMemory(2048, rm)
+        .withAppName("someApp2")
+        .withUser("someUser")
+        .withQueue("root.b")
+        .build();
+    RMApp app2 = MockRMAppSubmitter.submit(rm, data2);
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm4);
+
+    AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
+    List<ContainerId> release = new ArrayList<ContainerId>();
+    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+    allocateRequest.setReleaseList(release);
+    allocateRequest.setAskList(ask);
+
+    AllocateResponse response1 = am1.allocate(allocateRequest);
+    AllocateResponse response2 = am2.allocate(allocateRequest);
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
+    RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId());
+    RMNode rmNode4 = rm.getRMContext().getRMNodes().get(nm4.getNodeId());
+
+    // Do node heartbeats many times
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode3));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode4));
+    }
+
+    //has 3 nodes with node label "x"
+    Assert.assertEquals(3, response1.getNumClusterNodes());
+
+    //has 1 node with node label "y"
+    Assert.assertEquals(1, response2.getNumClusterNodes());
+
+    rm.stop();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org