You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by li...@apache.org on 2017/02/22 23:33:54 UTC

[08/50] [abbrv] hadoop git commit: YARN-6156. AM blacklisting to consider node label partition (Bibin A Chundatt via Varun Saxena)

YARN-6156. AM blacklisting to consider node label partition (Bibin A Chundatt via Varun Saxena)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b7613e0f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b7613e0f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b7613e0f

Branch: refs/heads/HADOOP-13345
Commit: b7613e0f406fb2b9bd5b1b3c79658e801f63c587
Parents: cd3e59a
Author: Varun Saxena <va...@apache.org>
Authored: Wed Feb 15 14:48:17 2017 +0530
Committer: Varun Saxena <va...@apache.org>
Committed: Wed Feb 15 14:48:17 2017 +0530

----------------------------------------------------------------------
 .../server/resourcemanager/RMServerUtils.java   | 22 ++++++
 .../nodelabels/RMNodeLabelsManager.java         | 16 +++++
 .../server/resourcemanager/rmapp/RMAppImpl.java | 12 ++--
 .../rmapp/attempt/RMAppAttemptImpl.java         |  4 +-
 .../TestCapacitySchedulerNodeLabelUpdate.java   | 73 ++++++++++++++++++++
 5 files changed, 118 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7613e0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index 224a1da..e98141b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
     .RMAppAttemptState;
@@ -561,4 +562,25 @@ public class RMServerUtils {
     }
     return newApplicationTimeout;
   }
+
+  /**
+   * Get applicable Node count for AM.
+   *
+   * @param rmContext context
+   * @param conf configuration
+   * @param amreq am resource request
+   * @return applicable node count
+   */
+  public static int getApplicableNodeCountForAM(RMContext rmContext,
+      Configuration conf, ResourceRequest amreq) {
+    if (YarnConfiguration.areNodeLabelsEnabled(conf)) {
+      RMNodeLabelsManager labelManager = rmContext.getNodeLabelManager();
+      String amNodeLabelExpression = amreq.getNodeLabelExpression();
+      amNodeLabelExpression = (amNodeLabelExpression == null
+          || amNodeLabelExpression.trim().isEmpty())
+              ? RMNodeLabelsManager.NO_LABEL : amNodeLabelExpression;
+      return labelManager.getActiveNMCountPerLabel(amNodeLabelExpression);
+    }
+    return rmContext.getScheduler().getNumClusterNodes();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7613e0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
index 5dc8392..effe422 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
@@ -350,6 +350,22 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
     }
   }
   
+  /*
+   * Get active node count based on label.
+   */
+  public int getActiveNMCountPerLabel(String label) {
+    if (label == null) {
+      return 0;
+    }
+    try {
+      readLock.lock();
+      RMNodeLabel labelInfo = labelCollections.get(label);
+      return (labelInfo == null) ? 0 : labelInfo.getNumActiveNMs();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   public Set<String> getLabelsOnNode(NodeId nodeId) {
     try {
       readLock.lock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7613e0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 12ece3f..516109b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -18,11 +18,9 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -46,9 +44,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.ipc.CallerContext;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -987,9 +983,11 @@ public class RMAppImpl implements RMApp, Recoverable {
       // Transfer over the blacklist from the previous app-attempt.
       currentAMBlacklistManager = currentAttempt.getAMBlacklistManager();
     } else {
-      if (amBlacklistingEnabled) {
+      if (amBlacklistingEnabled && !submissionContext.getUnmanagedAM()) {
         currentAMBlacklistManager = new SimpleBlacklistManager(
-            scheduler.getNumClusterNodes(), blacklistDisableThreshold);
+            RMServerUtils.getApplicableNodeCountForAM(rmContext, conf,
+                getAMResourceRequest()),
+            blacklistDisableThreshold);
       } else {
         currentAMBlacklistManager = new DisabledBlacklistManager();
       }
@@ -1006,7 +1004,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     attempts.put(appAttemptId, attempt);
     currentAttempt = attempt;
   }
-  
+
   private void
       createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
     createNewAttempt();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7613e0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index ab84985..1788722 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -1057,7 +1057,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
         appAttempt.amReq.setRelaxLocality(true);
 
         appAttempt.getAMBlacklistManager().refreshNodeHostCount(
-            appAttempt.scheduler.getNumClusterNodes());
+            RMServerUtils.getApplicableNodeCountForAM(appAttempt.rmContext,
+                appAttempt.conf, appAttempt.amReq));
 
         ResourceBlacklistRequest amBlacklist =
             appAttempt.getAMBlacklistManager().getBlacklistUpdates();
@@ -1246,7 +1247,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
     }
   }
 
-
   private void rememberTargetTransitions(RMAppAttemptEvent event,
       Object transitionToDo, RMAppAttemptState targetFinalState) {
     transitionTodo = transitionToDo;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7613e0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
index 732b5d1..b4ebd15 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -721,6 +723,77 @@ public class TestCapacitySchedulerNodeLabelUpdate {
     rm.close();
   }
 
+  @Test(timeout = 30000)
+  public void testBlacklistAMDisableLabel() throws Exception {
+    conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
+        true);
+    conf.setFloat(
+        YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD,
+        0.5f);
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("x"),
+        NodeId.newInstance("h3", 0), toSet("x"), NodeId.newInstance("h6", 0),
+        toSet("x")));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h4", 0), toSet("y"),
+        NodeId.newInstance("h5", 0), toSet("y"), NodeId.newInstance("h7", 0),
+        toSet("y")));
+
+    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+    // Nodes in label default h1,h8,h9
+    // Nodes in label x h2,h3,h6
+    // Nodes in label y h4,h5,h7
+    MockNM nm1 = rm.registerNode("h1:1234", 2048);
+    MockNM nm2 = rm.registerNode("h2:1234", 2048);
+    rm.registerNode("h3:1234", 2048);
+    rm.registerNode("h4:1234", 2048);
+    rm.registerNode("h5:1234", 2048);
+    rm.registerNode("h6:1234", 2048);
+    rm.registerNode("h7:1234", 2048);
+    rm.registerNode("h8:1234", 2048);
+    rm.registerNode("h9:1234", 2048);
+
+    // Submit app with AM container launched on default partition i.e. h1.
+    RMApp app = rm.submitApp(GB, "app", "user", null, "a");
+    MockRM.launchAndRegisterAM(app, rm, nm1);
+    RMAppAttempt appAttempt = app.getCurrentAppAttempt();
+    // Add default node blacklist from default
+    appAttempt.getAMBlacklistManager().addNode("h1");
+    ResourceBlacklistRequest blacklistUpdates =
+        appAttempt.getAMBlacklistManager().getBlacklistUpdates();
+    Assert.assertEquals(1, blacklistUpdates.getBlacklistAdditions().size());
+    Assert.assertEquals(0, blacklistUpdates.getBlacklistRemovals().size());
+    // Adding second node from default parition
+    appAttempt.getAMBlacklistManager().addNode("h8");
+    blacklistUpdates = appAttempt.getAMBlacklistManager().getBlacklistUpdates();
+    Assert.assertEquals(0, blacklistUpdates.getBlacklistAdditions().size());
+    Assert.assertEquals(2, blacklistUpdates.getBlacklistRemovals().size());
+
+    // Submission in label x
+    RMApp applabel = rm.submitApp(GB, "app", "user", null, "a", "x");
+    MockRM.launchAndRegisterAM(applabel, rm, nm2);
+    RMAppAttempt appAttemptlabelx = applabel.getCurrentAppAttempt();
+    appAttemptlabelx.getAMBlacklistManager().addNode("h2");
+    ResourceBlacklistRequest blacklistUpdatesOnx =
+        appAttemptlabelx.getAMBlacklistManager().getBlacklistUpdates();
+    Assert.assertEquals(1, blacklistUpdatesOnx.getBlacklistAdditions().size());
+    Assert.assertEquals(0, blacklistUpdatesOnx.getBlacklistRemovals().size());
+    // Adding second node from default parition
+    appAttemptlabelx.getAMBlacklistManager().addNode("h3");
+    blacklistUpdatesOnx =
+        appAttempt.getAMBlacklistManager().getBlacklistUpdates();
+    Assert.assertEquals(0, blacklistUpdatesOnx.getBlacklistAdditions().size());
+    Assert.assertEquals(2, blacklistUpdatesOnx.getBlacklistRemovals().size());
+
+    rm.close();
+  }
+
   private void checkAMResourceLimit(MockRM rm, String queuename, int memory,
       String label) throws InterruptedException {
     Assert.assertEquals(memory,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org