You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bo...@apache.org on 2018/08/02 17:17:44 UTC

[21/50] [abbrv] hadoop git commit: YARN-8175. Add support for Node Labels in SLS. Contributed by Abhishek Modi.

YARN-8175. Add support for Node Labels in SLS. Contributed by Abhishek Modi.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9fea5c9e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9fea5c9e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9fea5c9e

Branch: refs/heads/YARN-7402
Commit: 9fea5c9ee76bd36f273ae93afef5f3ef3c477a53
Parents: b28bdc7
Author: Inigo Goiri <in...@apache.org>
Authored: Tue Jul 31 09:36:34 2018 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Tue Jul 31 09:36:34 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/yarn/sls/SLSRunner.java   | 93 +++++++++++++++-----
 .../hadoop/yarn/sls/appmaster/AMSimulator.java  |  9 +-
 .../yarn/sls/appmaster/MRAMSimulator.java       |  5 +-
 .../yarn/sls/appmaster/StreamAMSimulator.java   |  5 +-
 .../hadoop/yarn/sls/conf/SLSConfiguration.java  |  1 +
 .../yarn/sls/nodemanager/NMSimulator.java       | 13 ++-
 .../apache/hadoop/yarn/sls/utils/SLSUtils.java  | 58 ++++++++----
 .../yarn/sls/appmaster/TestAMSimulator.java     | 35 +++++++-
 .../hadoop/yarn/sls/utils/TestSLSUtils.java     | 64 ++++++++++----
 .../test/resources/nodes-with-resources.json    |  8 +-
 .../hadoop/yarn/client/cli/RMAdminCLI.java      | 71 +--------------
 .../yarn/client/util/YarnClientUtils.java       | 77 ++++++++++++++++
 12 files changed, 301 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fea5c9e/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index e859732..1e83e40 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -298,30 +299,20 @@ public class SLSRunner extends Configured implements Tool {
         SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO,
         SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT);
     // nm information (fetch from topology file, or from sls/rumen json file)
-    Map<String, Resource> nodeResourceMap = new HashMap<>();
-    Set<? extends  String> nodeSet;
+    Set<NodeDetails> nodeSet = null;
     if (nodeFile.isEmpty()) {
       for (String inputTrace : inputTraces) {
         switch (inputType) {
         case SLS:
           nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace);
-          for (String node : nodeSet) {
-            nodeResourceMap.put(node, null);
-          }
           break;
         case RUMEN:
           nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace);
-          for (String node : nodeSet) {
-            nodeResourceMap.put(node, null);
-          }
           break;
         case SYNTH:
           stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
           nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(),
               stjp.getNumNodes()/stjp.getNodesPerRack());
-          for (String node : nodeSet) {
-            nodeResourceMap.put(node, null);
-          }
           break;
         default:
           throw new YarnException("Input configuration not recognized, "
@@ -329,11 +320,11 @@ public class SLSRunner extends Configured implements Tool {
         }
       }
     } else {
-      nodeResourceMap = SLSUtils.parseNodesFromNodeFile(nodeFile,
+      nodeSet = SLSUtils.parseNodesFromNodeFile(nodeFile,
           nodeManagerResource);
     }
 
-    if (nodeResourceMap.size() == 0) {
+    if (nodeSet == null || nodeSet.isEmpty()) {
       throw new YarnException("No node! Please configure nodes.");
     }
 
@@ -344,20 +335,21 @@ public class SLSRunner extends Configured implements Tool {
         SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
     ExecutorService executorService = Executors.
         newFixedThreadPool(threadPoolSize);
-    for (Map.Entry<String, Resource> entry : nodeResourceMap.entrySet()) {
+    for (NodeDetails nodeDetails : nodeSet) {
       executorService.submit(new Runnable() {
         @Override public void run() {
           try {
             // we randomize the heartbeat start time from zero to 1 interval
             NMSimulator nm = new NMSimulator();
             Resource nmResource = nodeManagerResource;
-            String hostName = entry.getKey();
-            if (entry.getValue() != null) {
-              nmResource = entry.getValue();
+            String hostName = nodeDetails.getHostname();
+            if (nodeDetails.getNodeResource() != null) {
+              nmResource = nodeDetails.getNodeResource();
             }
+            Set<NodeLabel> nodeLabels = nodeDetails.getLabels();
             nm.init(hostName, nmResource,
                 random.nextInt(heartbeatInterval),
-                heartbeatInterval, rm, resourceUtilizationRatio);
+                heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels);
             nmMap.put(nm.getNode().getNodeID(), nm);
             runner.schedule(nm);
             rackSet.add(nm.getNode().getRackName());
@@ -452,6 +444,11 @@ public class SLSRunner extends Configured implements Tool {
           jsonJob.get(SLSConfiguration.JOB_END_MS).toString());
     }
 
+    String jobLabelExpr = null;
+    if (jsonJob.containsKey(SLSConfiguration.JOB_LABEL_EXPR)) {
+      jobLabelExpr = jsonJob.get(SLSConfiguration.JOB_LABEL_EXPR).toString();
+    }
+
     String user = (String) jsonJob.get(SLSConfiguration.JOB_USER);
     if (user == null) {
       user = "default";
@@ -481,7 +478,8 @@ public class SLSRunner extends Configured implements Tool {
 
     for (int i = 0; i < jobCount; i++) {
       runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
-          getTaskContainers(jsonJob), getAMContainerResource(jsonJob));
+          getTaskContainers(jsonJob), getAMContainerResource(jsonJob),
+          jobLabelExpr);
     }
   }
 
@@ -730,7 +728,7 @@ public class SLSRunner extends Configured implements Tool {
 
       runNewAM(job.getType(), user, jobQueue, oldJobId,
           jobStartTimeMS, jobFinishTimeMS, containerList, reservationId,
-          job.getDeadline(), getAMContainerResource(null),
+          job.getDeadline(), getAMContainerResource(null), null,
           job.getParams());
     }
   }
@@ -775,15 +773,24 @@ public class SLSRunner extends Configured implements Tool {
       Resource amContainerResource) {
     runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
         jobFinishTimeMS, containerList, null,  -1,
-        amContainerResource, null);
+        amContainerResource, null, null);
   }
 
   private void runNewAM(String jobType, String user,
       String jobQueue, String oldJobId, long jobStartTimeMS,
       long jobFinishTimeMS, List<ContainerSimulator> containerList,
-      ReservationId reservationId, long deadline, Resource amContainerResource,
-      Map<String, String> params) {
+      Resource amContainerResource, String labelExpr) {
+    runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
+        jobFinishTimeMS, containerList, null,  -1,
+        amContainerResource, labelExpr, null);
+  }
 
+  @SuppressWarnings("checkstyle:parameternumber")
+  private void runNewAM(String jobType, String user,
+      String jobQueue, String oldJobId, long jobStartTimeMS,
+      long jobFinishTimeMS, List<ContainerSimulator> containerList,
+      ReservationId reservationId, long deadline, Resource amContainerResource,
+      String labelExpr, Map<String, String> params) {
     AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
         amClassMap.get(jobType), new Configuration());
 
@@ -799,7 +806,7 @@ public class SLSRunner extends Configured implements Tool {
       AM_ID++;
       amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
           jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
-          runner.getStartTimeMS(), amContainerResource, params);
+          runner.getStartTimeMS(), amContainerResource, labelExpr, params);
       if(reservationId != null) {
         // if we have a ReservationId, delegate reservation creation to
         // AMSim (reservation shape is impl specific)
@@ -985,4 +992,42 @@ public class SLSRunner extends Configured implements Tool {
     System.err.println();
   }
 
+  /**
+   * Class to encapsulate all details about the node.
+   */
+  @Private
+  @Unstable
+  public static class NodeDetails {
+    private String hostname;
+    private Resource nodeResource;
+    private Set<NodeLabel> labels;
+
+    public NodeDetails(String nodeHostname) {
+      this.hostname = nodeHostname;
+    }
+
+    public String getHostname() {
+      return hostname;
+    }
+
+    public void setHostname(String hostname) {
+      this.hostname = hostname;
+    }
+
+    public Resource getNodeResource() {
+      return nodeResource;
+    }
+
+    public void setNodeResource(Resource nodeResource) {
+      this.nodeResource = nodeResource;
+    }
+
+    public Set<NodeLabel> getLabels() {
+      return labels;
+    }
+
+    public void setLabels(Set<NodeLabel> labels) {
+      this.labels = labels;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fea5c9e/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index 8e1c256..5f34cfc 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -88,6 +88,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
   private int responseId = 0;
   // user name
   private String user;
+  // nodelabel expression
+  private String nodeLabelExpression;
   // queue name
   protected String queue;
   // am type
@@ -123,7 +125,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
       List<ContainerSimulator> containerList, ResourceManager resourceManager,
       SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
       String simQueue, boolean tracked, String oldApp, long baseTimeMS,
-      Resource amResource, Map<String, String> params) {
+      Resource amResource, String nodeLabelExpr,
+      Map<String, String> params) {
     super.init(startTime, startTime + 1000000L * heartbeatInterval,
         heartbeatInterval);
     this.user = simUser;
@@ -136,6 +139,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
     this.traceStartTimeMS = startTime;
     this.traceFinishTimeMS = finishTime;
     this.amContainerResource = amResource;
+    this.nodeLabelExpression = nodeLabelExpr;
   }
 
   /**
@@ -327,6 +331,9 @@ public abstract class AMSimulator extends TaskRunner.Task {
     conLauContext.setServiceData(new HashMap<>());
     appSubContext.setAMContainerSpec(conLauContext);
     appSubContext.setResource(amContainerResource);
+    if (nodeLabelExpression != null) {
+      appSubContext.setNodeLabelExpression(nodeLabelExpression);
+    }
 
     if(reservationId != null) {
       appSubContext.setReservationID(reservationId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fea5c9e/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
index 6f0f85f..71fc5b2 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
@@ -126,10 +126,11 @@ public class MRAMSimulator extends AMSimulator {
       List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
       long traceStartTime, long traceFinishTime, String user, String queue,
       boolean isTracked, String oldAppId, long baselineStartTimeMS,
-      Resource amContainerResource, Map<String, String> params) {
+      Resource amContainerResource, String nodeLabelExpr,
+      Map<String, String> params) {
     super.init(heartbeatInterval, containerList, rm, se,
         traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId,
-        baselineStartTimeMS, amContainerResource, params);
+        baselineStartTimeMS, amContainerResource, nodeLabelExpr, params);
     amtype = "mapreduce";
 
     // get map/reduce tasks

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fea5c9e/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
index b41f5f2..862e5ec 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
@@ -96,10 +96,11 @@ public class StreamAMSimulator extends AMSimulator {
       List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
       long traceStartTime, long traceFinishTime, String user, String queue,
       boolean isTracked, String oldAppId, long baselineStartTimeMS,
-      Resource amContainerResource, Map<String, String> params) {
+      Resource amContainerResource, String nodeLabelExpr,
+      Map<String, String> params) {
     super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
         traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
-        amContainerResource, params);
+        amContainerResource, nodeLabelExpr, params);
     amtype = "stream";
 
     allStreams.addAll(containerList);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fea5c9e/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
index ea73bef..09f653f 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/conf/SLSConfiguration.java
@@ -104,6 +104,7 @@ public class SLSConfiguration {
   public static final String JOB_START_MS = JOB_PREFIX + "start.ms";
   public static final String JOB_END_MS = JOB_PREFIX + "end.ms";
   public static final String JOB_QUEUE_NAME = JOB_PREFIX + "queue.name";
+  public static final String JOB_LABEL_EXPR = JOB_PREFIX + "label.expression";
   public static final String JOB_USER = JOB_PREFIX + "user";
   public static final String JOB_COUNT = JOB_PREFIX + "count";
   public static final String JOB_TASKS = JOB_PREFIX + "tasks";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fea5c9e/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
index 428a839..6a8430e 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.DelayQueue;
 
@@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -78,7 +80,7 @@ public class NMSimulator extends TaskRunner.Task {
   
   public void init(String nodeIdStr, Resource nodeResource, int dispatchTime,
       int heartBeatInterval, ResourceManager pRm,
-      float pResourceUtilizationRatio)
+      float pResourceUtilizationRatio, Set<NodeLabel> labels)
       throws IOException, YarnException {
     super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval,
         heartBeatInterval);
@@ -102,6 +104,7 @@ public class NMSimulator extends TaskRunner.Task {
             Records.newRecord(RegisterNodeManagerRequest.class);
     req.setNodeId(node.getNodeID());
     req.setResource(node.getTotalCapability());
+    req.setNodeLabels(labels);
     req.setHttpPort(80);
     RegisterNodeManagerResponse response = this.rm.getResourceTrackerService()
             .registerNodeManager(req);
@@ -109,6 +112,14 @@ public class NMSimulator extends TaskRunner.Task {
     this.resourceUtilizationRatio = pResourceUtilizationRatio;
   }
 
+  public void init(String nodeIdStr, Resource nodeResource, int dispatchTime,
+      int heartBeatInterval, ResourceManager pRm,
+      float pResourceUtilizationRatio)
+      throws IOException, YarnException {
+    init(nodeIdStr, nodeResource, dispatchTime, heartBeatInterval, pRm,
+        pResourceUtilizationRatio, null);
+  }
+
   @Override
   public void firstStep() {
     // do nothing

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fea5c9e/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
index f2129d0..8bb4871 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Reader;
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -41,8 +40,11 @@ import org.apache.hadoop.tools.rumen.JobTraceReader;
 import org.apache.hadoop.tools.rumen.LoggedJob;
 import org.apache.hadoop.tools.rumen.LoggedTask;
 import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.client.util.YarnClientUtils;
+import org.apache.hadoop.yarn.sls.SLSRunner.NodeDetails;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -52,6 +54,10 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 public class SLSUtils {
   public final static String DEFAULT_JOB_TYPE = "mapreduce";
 
+  private static final String LABEL_FORMAT_ERR_MSG =
+      "Input format for adding node-labels is not correct, it should be "
+          + "labelName1[(exclusive=true/false)],labelName2[] ..";
+
   // hostname includes the network path and the host name. for example
   // "/default-rack/hostFoo" or "/coreSwitchA/TORSwitchB/hostBar".
   // the function returns two Strings, the first element is the network
@@ -66,9 +72,9 @@ public class SLSUtils {
   /**
    * parse the rumen trace file, return each host name
    */
-  public static Set<String> parseNodesFromRumenTrace(String jobTrace)
-          throws IOException {
-    Set<String> nodeSet = new HashSet<String>();
+  public static Set<NodeDetails> parseNodesFromRumenTrace(
+      String jobTrace) throws IOException {
+    Set<NodeDetails> nodeSet = new HashSet<>();
 
     File fin = new File(jobTrace);
     Configuration conf = new Configuration();
@@ -85,7 +91,8 @@ public class SLSUtils {
           }
           LoggedTaskAttempt taskAttempt = mapTask.getAttempts()
                   .get(mapTask.getAttempts().size() - 1);
-          nodeSet.add(taskAttempt.getHostName().getValue());
+          nodeSet.add(new NodeDetails(
+              taskAttempt.getHostName().getValue()));
         }
         for(LoggedTask reduceTask : job.getReduceTasks()) {
           if (reduceTask.getAttempts().size() == 0) {
@@ -93,7 +100,8 @@ public class SLSUtils {
           }
           LoggedTaskAttempt taskAttempt = reduceTask.getAttempts()
                   .get(reduceTask.getAttempts().size() - 1);
-          nodeSet.add(taskAttempt.getHostName().getValue());
+          nodeSet.add(new NodeDetails(
+              taskAttempt.getHostName().getValue()));
         }
       }
     } finally {
@@ -106,9 +114,9 @@ public class SLSUtils {
   /**
    * parse the sls trace file, return each host name
    */
-  public static Set<String> parseNodesFromSLSTrace(String jobTrace)
-          throws IOException {
-    Set<String> nodeSet = new HashSet<>();
+  public static Set<NodeDetails> parseNodesFromSLSTrace(
+      String jobTrace) throws IOException {
+    Set<NodeDetails> nodeSet = new HashSet<>();
     JsonFactory jsonF = new JsonFactory();
     ObjectMapper mapper = new ObjectMapper();
     Reader input =
@@ -124,7 +132,8 @@ public class SLSUtils {
     return nodeSet;
   }
 
-  private static void addNodes(Set<String> nodeSet, Map jsonEntry) {
+  private static void addNodes(Set<NodeDetails> nodeSet,
+      Map jsonEntry) {
     if (jsonEntry.containsKey(SLSConfiguration.NUM_NODES)) {
       int numNodes = Integer.parseInt(
           jsonEntry.get(SLSConfiguration.NUM_NODES).toString());
@@ -142,7 +151,7 @@ public class SLSUtils {
         Map jsonTask = (Map) o;
         String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST);
         if (hostname != null) {
-          nodeSet.add(hostname);
+          nodeSet.add(new NodeDetails(hostname));
         }
       }
     }
@@ -150,10 +159,11 @@ public class SLSUtils {
 
   /**
    * parse the input node file, return each host name
+   * sample input: label1(exclusive=true),label2(exclusive=false),label3
    */
-  public static Map<String, Resource> parseNodesFromNodeFile(String nodeFile,
-      Resource nmDefaultResource) throws IOException {
-    Map<String, Resource> nodeResourceMap = new HashMap<>();
+  public static Set<NodeDetails> parseNodesFromNodeFile(
+      String nodeFile, Resource nmDefaultResource) throws IOException {
+    Set<NodeDetails> nodeSet = new HashSet<>();
     JsonFactory jsonF = new JsonFactory();
     ObjectMapper mapper = new ObjectMapper();
     Reader input =
@@ -166,6 +176,8 @@ public class SLSUtils {
         List tasks = (List) jsonE.get("nodes");
         for (Object o : tasks) {
           Map jsonNode = (Map) o;
+          NodeDetails nodeDetails = new NodeDetails(
+              rack + "/" + jsonNode.get("node"));
           Resource nodeResource = Resources.clone(nmDefaultResource);
           ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
           for (ResourceInformation info : infors) {
@@ -174,18 +186,25 @@ public class SLSUtils {
                   Integer.parseInt(jsonNode.get(info.getName()).toString()));
             }
           }
-          nodeResourceMap.put(rack + "/" + jsonNode.get("node"), nodeResource);
+          nodeDetails.setNodeResource(nodeResource);
+          if (jsonNode.get("labels") != null) {
+            Set<NodeLabel> nodeLabels =  new HashSet<>(
+                YarnClientUtils.buildNodeLabelsFromStr(
+                    jsonNode.get("labels").toString()));
+            nodeDetails.setLabels(nodeLabels);
+          }
+          nodeSet.add(nodeDetails);
         }
       }
     } finally {
       input.close();
     }
-    return nodeResourceMap;
+    return nodeSet;
   }
 
-  public static Set<? extends String> generateNodes(int numNodes,
+  public static Set<NodeDetails> generateNodes(int numNodes,
       int numRacks){
-    Set<String> nodeSet = new HashSet<>();
+    Set<NodeDetails> nodeSet = new HashSet<>();
     if (numRacks < 1) {
       numRacks = 1;
     }
@@ -195,7 +214,8 @@ public class SLSUtils {
     }
 
     for (int i = 0; i < numNodes; i++) {
-      nodeSet.add("/rack" + i % numRacks + "/node" + i);
+      nodeSet.add(new NodeDetails(
+          "/rack" + i % numRacks + "/node" + i));
     }
     return nodeSet;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fea5c9e/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
index bc8ea70..2efa846 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
@@ -19,10 +19,13 @@ package org.apache.hadoop.yarn.sls.appmaster;
 
 import com.codahale.metrics.MetricRegistry;
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
@@ -42,6 +45,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ConcurrentMap;
 
 @RunWith(Parameterized.class)
 public class TestAMSimulator {
@@ -73,6 +77,7 @@ public class TestAMSimulator {
     conf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricOutputDir.toString());
     conf.set(YarnConfiguration.RM_SCHEDULER, slsScheduler.getName());
     conf.set(SLSConfiguration.RM_SCHEDULER, scheduler.getName());
+    conf.set(YarnConfiguration.NODE_LABELS_ENABLED, "true");
     conf.setBoolean(SLSConfiguration.METRICS_SWITCH, true);
     rm = new ResourceManager();
     rm.init(conf);
@@ -140,7 +145,7 @@ public class TestAMSimulator {
     String queue = "default";
     List<ContainerSimulator> containers = new ArrayList<>();
     app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
-        appId, 0, SLSConfiguration.getAMContainerResource(conf), null);
+        appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null);
     app.firstStep();
 
     verifySchedulerMetrics(appId);
@@ -152,6 +157,34 @@ public class TestAMSimulator {
     app.lastStep();
   }
 
+  @Test
+  public void testAMSimulatorWithNodeLabels() throws Exception {
+    if (scheduler.equals(CapacityScheduler.class)) {
+      // add label to the cluster
+      RMAdminCLI rmAdminCLI = new RMAdminCLI(conf);
+      String[] args = {"-addToClusterNodeLabels", "label1"};
+      rmAdminCLI.run(args);
+
+      MockAMSimulator app = new MockAMSimulator();
+      String appId = "app1";
+      String queue = "default";
+      List<ContainerSimulator> containers = new ArrayList<>();
+      app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
+          appId, 0, SLSConfiguration.getAMContainerResource(conf),
+          "label1", null);
+      app.firstStep();
+
+      verifySchedulerMetrics(appId);
+
+      ConcurrentMap<ApplicationId, RMApp> rmApps =
+          rm.getRMContext().getRMApps();
+      Assert.assertEquals(1, rmApps.size());
+      RMApp rmApp = rmApps.get(app.appId);
+      Assert.assertNotNull(rmApp);
+      Assert.assertEquals("label1", rmApp.getAmNodeLabelExpression());
+    }
+  }
+
   @After
   public void tearDown() {
     if (rm != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fea5c9e/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java
index 5e586b13..c59c2af 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java
@@ -18,13 +18,13 @@
 
 package org.apache.hadoop.yarn.sls.utils;
 
-import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.sls.SLSRunner.NodeDetails;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
 public class TestSLSUtils {
@@ -45,28 +45,54 @@ public class TestSLSUtils {
   @Test
   public void testParseNodesFromNodeFile() throws Exception {
     String nodeFile = "src/test/resources/nodes.json";
-    Map<String, Resource> nodeResourceMap = SLSUtils.parseNodesFromNodeFile(
+    Set<NodeDetails> nodeDetails = SLSUtils.parseNodesFromNodeFile(
         nodeFile, Resources.createResource(1024, 2));
-    Assert.assertEquals(20, nodeResourceMap.size());
+    Assert.assertEquals(20, nodeDetails.size());
 
     nodeFile = "src/test/resources/nodes-with-resources.json";
-    nodeResourceMap = SLSUtils.parseNodesFromNodeFile(
+    nodeDetails = SLSUtils.parseNodesFromNodeFile(
         nodeFile, Resources.createResource(1024, 2));
-    Assert.assertEquals(4,
-        nodeResourceMap.size());
-    Assert.assertEquals(2048,
-        nodeResourceMap.get("/rack1/node1").getMemorySize());
-    Assert.assertEquals(6,
-        nodeResourceMap.get("/rack1/node1").getVirtualCores());
-    Assert.assertEquals(1024,
-        nodeResourceMap.get("/rack1/node2").getMemorySize());
-    Assert.assertEquals(2,
-        nodeResourceMap.get("/rack1/node2").getVirtualCores());
+    Assert.assertEquals(4, nodeDetails.size());
+    for (NodeDetails nodeDetail : nodeDetails) {
+      if (nodeDetail.getHostname().equals("/rack1/node1")) {
+        Assert.assertEquals(2048,
+            nodeDetail.getNodeResource().getMemorySize());
+        Assert.assertEquals(6,
+            nodeDetail.getNodeResource().getVirtualCores());
+      } else if (nodeDetail.getHostname().equals("/rack1/node2")) {
+        Assert.assertEquals(1024,
+            nodeDetail.getNodeResource().getMemorySize());
+        Assert.assertEquals(2,
+            nodeDetail.getNodeResource().getVirtualCores());
+        Assert.assertNull(nodeDetail.getLabels());
+      } else if (nodeDetail.getHostname().equals("/rack1/node3")) {
+        Assert.assertEquals(1024,
+            nodeDetail.getNodeResource().getMemorySize());
+        Assert.assertEquals(2,
+            nodeDetail.getNodeResource().getVirtualCores());
+        Assert.assertEquals(2, nodeDetail.getLabels().size());
+        for (NodeLabel nodeLabel : nodeDetail.getLabels()) {
+          if (nodeLabel.getName().equals("label1")) {
+            Assert.assertTrue(nodeLabel.isExclusive());
+          } else if(nodeLabel.getName().equals("label2")) {
+            Assert.assertFalse(nodeLabel.isExclusive());
+          } else {
+            Assert.assertTrue("Unexepected label", false);
+          }
+        }
+      } else if (nodeDetail.getHostname().equals("/rack1/node4")) {
+        Assert.assertEquals(6144,
+            nodeDetail.getNodeResource().getMemorySize());
+        Assert.assertEquals(12,
+            nodeDetail.getNodeResource().getVirtualCores());
+        Assert.assertEquals(2, nodeDetail.getLabels().size());
+      }
+    }
   }
 
   @Test
   public void testGenerateNodes() {
-    Set<? extends String> nodes = SLSUtils.generateNodes(3, 3);
+    Set<NodeDetails> nodes = SLSUtils.generateNodes(3, 3);
     Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size());
     Assert.assertEquals("Number of racks is wrong.", 3, getNumRack(nodes));
 
@@ -83,10 +109,10 @@ public class TestSLSUtils {
     Assert.assertEquals("Number of racks is wrong.", 1, getNumRack(nodes));
   }
 
-  private int getNumRack(Set<? extends String> nodes) {
+  private int getNumRack(Set<NodeDetails> nodes) {
     Set<String> racks = new HashSet<>();
-    for (String node : nodes) {
-      String[] rackHostname = SLSUtils.getRackHostName(node);
+    for (NodeDetails node : nodes) {
+      String[] rackHostname = SLSUtils.getRackHostName(node.getHostname());
       racks.add(rackHostname[0]);
     }
     return racks.size();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fea5c9e/hadoop-tools/hadoop-sls/src/test/resources/nodes-with-resources.json
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/test/resources/nodes-with-resources.json b/hadoop-tools/hadoop-sls/src/test/resources/nodes-with-resources.json
index 0039181..dc5f020 100644
--- a/hadoop-tools/hadoop-sls/src/test/resources/nodes-with-resources.json
+++ b/hadoop-tools/hadoop-sls/src/test/resources/nodes-with-resources.json
@@ -10,10 +10,14 @@
       "node": "node2"
     },
     {
-      "node": "node3"
+      "node": "node3",
+      "labels": "label1, label2(exclusive=false)"
     },
     {
-      "node": "node4"
+      "node": "node4",
+      "labels": "label1, label2(exclusive=false)",
+      "memory-mb" : 6144,
+      "vcores" : 12
     }
   ]
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fea5c9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
index 8d1d56b..a24c398 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.client.cli;
 
 import java.io.IOException;
 import java.io.PrintStream;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -54,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.RMHAServiceTarget;
+import org.apache.hadoop.yarn.client.util.YarnClientUtils;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -82,7 +82,8 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
+
+import static org.apache.hadoop.yarn.client.util.YarnClientUtils.NO_LABEL_ERR_MSG;
 
 @Private
 @Unstable
@@ -91,15 +92,10 @@ public class RMAdminCLI extends HAAdmin {
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
   static CommonNodeLabelsManager localNodeLabelsManager = null;
-  private static final String NO_LABEL_ERR_MSG =
-      "No cluster node-labels are specified";
   private static final String NO_MAPPING_ERR_MSG =
       "No node-to-labels mappings are specified";
   private static final String INVALID_TIMEOUT_ERR_MSG =
       "Invalid timeout specified : ";
-  private static final String ADD_LABEL_FORMAT_ERR_MSG =
-      "Input format for adding node-labels is not correct, it should be "
-          + "labelName1[(exclusive=true/false)],LabelName2[] ..";
   private static final Pattern RESOURCE_TYPES_ARGS_PATTERN =
       Pattern.compile("^[0-9]*$");
 
@@ -533,65 +529,6 @@ public class RMAdminCLI extends HAAdmin {
     }
     return localNodeLabelsManager;
   }
-  
-  private List<NodeLabel> buildNodeLabelsFromStr(String args) {
-    List<NodeLabel> nodeLabels = new ArrayList<>();
-    for (String p : args.split(",")) {
-      if (!p.trim().isEmpty()) {
-        String labelName = p;
-
-        // Try to parse exclusive
-        boolean exclusive = NodeLabel.DEFAULT_NODE_LABEL_EXCLUSIVITY;
-        int leftParenthesisIdx = p.indexOf("(");
-        int rightParenthesisIdx = p.indexOf(")");
-
-        if ((leftParenthesisIdx == -1 && rightParenthesisIdx != -1)
-            || (leftParenthesisIdx != -1 && rightParenthesisIdx == -1)) {
-          // Parenthese not match
-          throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
-        }
-
-        if (leftParenthesisIdx > 0 && rightParenthesisIdx > 0) {
-          if (leftParenthesisIdx > rightParenthesisIdx) {
-            // Parentese not match
-            throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
-          }
-
-          String property = p.substring(p.indexOf("(") + 1, p.indexOf(")"));
-          if (property.contains("=")) {
-            String key = property.substring(0, property.indexOf("=")).trim();
-            String value =
-                property
-                    .substring(property.indexOf("=") + 1, property.length())
-                    .trim();
-
-            // Now we only support one property, which is exclusive, so check if
-            // key = exclusive and value = {true/false}
-            if (key.equals("exclusive")
-                && ImmutableSet.of("true", "false").contains(value)) {
-              exclusive = Boolean.parseBoolean(value);
-            } else {
-              throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
-            }
-          } else if (!property.trim().isEmpty()) {
-            throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
-          }
-        }
-
-        // Try to get labelName if there's "(..)"
-        if (labelName.contains("(")) {
-          labelName = labelName.substring(0, labelName.indexOf("(")).trim();
-        }
-
-        nodeLabels.add(NodeLabel.newInstance(labelName, exclusive));
-      }
-    }
-
-    if (nodeLabels.isEmpty()) {
-      throw new IllegalArgumentException(NO_LABEL_ERR_MSG);
-    }
-    return nodeLabels;
-  }
 
   private Set<String> buildNodeLabelNamesFromStr(String args) {
     Set<String> labels = new HashSet<String>();
@@ -624,7 +561,7 @@ public class RMAdminCLI extends HAAdmin {
       return exitCode;
     }
 
-    List<NodeLabel> labels = buildNodeLabelsFromStr(
+    List<NodeLabel> labels = YarnClientUtils.buildNodeLabelsFromStr(
         cliParser.getOptionValue("addToClusterNodeLabels"));
     if (cliParser.hasOption("directlyAccessNodeLabelStore")) {
       getNodeLabelManagerInstance(getConf()).addToCluserNodeLabels(labels);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fea5c9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/YarnClientUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/YarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/YarnClientUtils.java
index 1e3112a..1717675 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/YarnClientUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/YarnClientUtils.java
@@ -19,8 +19,13 @@ package org.apache.hadoop.yarn.client.util;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
@@ -29,6 +34,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
  * YARN clients.
  */
 public abstract class YarnClientUtils {
+
+  private static final String ADD_LABEL_FORMAT_ERR_MSG =
+      "Input format for adding node-labels is not correct, it should be "
+          + "labelName1[(exclusive=true/false)],LabelName2[] ..";
+
+  public static final String NO_LABEL_ERR_MSG =
+      "No cluster node-labels are specified";
+
   /**
    * Look up and return the resource manager's principal. This method
    * automatically does the <code>_HOST</code> replacement in the principal and
@@ -80,6 +93,70 @@ public abstract class YarnClientUtils {
   }
 
   /**
+   * Creates node labels from string
+   * @param args nodelabels string to be parsed
+   * @return list of node labels
+   */
+  public static List<NodeLabel> buildNodeLabelsFromStr(String args) {
+    List<NodeLabel> nodeLabels = new ArrayList<>();
+    for (String p : args.split(",")) {
+      if (!p.trim().isEmpty()) {
+        String labelName = p;
+
+        // Try to parse exclusive
+        boolean exclusive = NodeLabel.DEFAULT_NODE_LABEL_EXCLUSIVITY;
+        int leftParenthesisIdx = p.indexOf("(");
+        int rightParenthesisIdx = p.indexOf(")");
+
+        if ((leftParenthesisIdx == -1 && rightParenthesisIdx != -1)
+            || (leftParenthesisIdx != -1 && rightParenthesisIdx == -1)) {
+          // Parentheses not match
+          throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
+        }
+
+        if (leftParenthesisIdx > 0 && rightParenthesisIdx > 0) {
+          if (leftParenthesisIdx > rightParenthesisIdx) {
+            // Parentheses not match
+            throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
+          }
+
+          String property = p.substring(p.indexOf("(") + 1, p.indexOf(")"));
+          if (property.contains("=")) {
+            String key = property.substring(0, property.indexOf("=")).trim();
+            String value =
+                property
+                    .substring(property.indexOf("=") + 1, property.length())
+                    .trim();
+
+            // Now we only support one property, which is exclusive, so check if
+            // key = exclusive and value = {true/false}
+            if (key.equals("exclusive")
+                && ImmutableSet.of("true", "false").contains(value)) {
+              exclusive = Boolean.parseBoolean(value);
+            } else {
+              throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
+            }
+          } else if (!property.trim().isEmpty()) {
+            throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
+          }
+        }
+
+        // Try to get labelName if there's "(..)"
+        if (labelName.contains("(")) {
+          labelName = labelName.substring(0, labelName.indexOf("(")).trim();
+        }
+
+        nodeLabels.add(NodeLabel.newInstance(labelName, exclusive));
+      }
+    }
+
+    if (nodeLabels.isEmpty()) {
+      throw new IllegalArgumentException(NO_LABEL_ERR_MSG);
+    }
+    return nodeLabels;
+  }
+
+  /**
    * Returns a {@link YarnConfiguration} built from the {@code conf} parameter
    * that is guaranteed to have the {@link YarnConfiguration#RM_HA_ID}
    * property set.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org