You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/02/15 23:50:44 UTC

[15/50] [abbrv] hadoop git commit: YARN-7838. Support AND/OR constraints in Distributed Shell. Contributed by Weiwei Yang.

YARN-7838. Support AND/OR constraints in Distributed Shell. Contributed by Weiwei Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a08c0488
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a08c0488
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a08c0488

Branch: refs/heads/HDFS-7240
Commit: a08c048832d68c203fbdfce8d9f0e7dcccb02a55
Parents: 25fbec6
Author: Weiwei Yang <ww...@apache.org>
Authored: Sun Feb 11 14:20:46 2018 +0800
Committer: Weiwei Yang <ww...@apache.org>
Committed: Sun Feb 11 14:20:46 2018 +0800

----------------------------------------------------------------------
 .../PlacementConstraintParseException.java      |  28 +
 .../constraint/PlacementConstraintParser.java   | 615 +++++++++++++++++++
 .../yarn/util/constraint/package-info.java      |  22 +
 .../resource/TestPlacementConstraintParser.java | 372 +++++++++++
 .../distributedshell/ApplicationMaster.java     |  10 +-
 .../applications/distributedshell/Client.java   |   8 +-
 .../distributedshell/PlacementSpec.java         |  86 +--
 7 files changed, 1075 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08c0488/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParseException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParseException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParseException.java
new file mode 100644
index 0000000..8f3e28c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParseException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.util.constraint;
+
+/**
+ * Exception when the placement constraint parser fails to parse an expression.
+ */
+public class PlacementConstraintParseException extends Exception {
+
+  public PlacementConstraintParseException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08c0488/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
new file mode 100644
index 0000000..603e692
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
@@ -0,0 +1,615 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.util.constraint;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.StringTokenizer;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Stack;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Placement constraint expression parser.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class PlacementConstraintParser {
+
+  private static final char EXPRESSION_DELIM = ':';
+  private static final char KV_SPLIT_DELIM = '=';
+  private static final char EXPRESSION_VAL_DELIM = ',';
+  private static final char BRACKET_START = '(';
+  private static final char BRACKET_END = ')';
+  private static final String IN = "in";
+  private static final String NOT_IN = "notin";
+  private static final String AND = "and";
+  private static final String OR = "or";
+  private static final String CARDINALITY = "cardinality";
+  private static final String SCOPE_NODE = PlacementConstraints.NODE;
+  private static final String SCOPE_RACK = PlacementConstraints.RACK;
+
+  private PlacementConstraintParser() {
+    // Private constructor for this utility class.
+  }
+
+  /**
+   * Constraint Parser used to parse placement constraints from a
+   * given expression.
+   */
+  public static abstract class ConstraintParser {
+
+    private final ConstraintTokenizer tokenizer;
+
+    public ConstraintParser(ConstraintTokenizer tk){
+      this.tokenizer = tk;
+    }
+
+    void validate() throws PlacementConstraintParseException {
+      tokenizer.validate();
+    }
+
+    void shouldHaveNext()
+        throws PlacementConstraintParseException {
+      if (!tokenizer.hasMoreElements()) {
+        throw new PlacementConstraintParseException("Expecting more tokens");
+      }
+    }
+
+    String nextToken() {
+      return this.tokenizer.nextElement().trim();
+    }
+
+    boolean hasMoreTokens() {
+      return this.tokenizer.hasMoreElements();
+    }
+
+    int toInt(String name) throws PlacementConstraintParseException {
+      try {
+        return Integer.parseInt(name);
+      } catch (NumberFormatException e) {
+        throw new PlacementConstraintParseException(
+            "Expecting an Integer, but get " + name);
+      }
+    }
+
+    String parseScope(String scopeString)
+        throws PlacementConstraintParseException {
+      if (scopeString.equalsIgnoreCase(SCOPE_NODE)) {
+        return SCOPE_NODE;
+      } else if (scopeString.equalsIgnoreCase(SCOPE_RACK)) {
+        return SCOPE_RACK;
+      } else {
+        throw new PlacementConstraintParseException(
+            "expecting scope to " + SCOPE_NODE + " or " + SCOPE_RACK
+                + ", but met " + scopeString);
+      }
+    }
+
+    public AbstractConstraint tryParse() {
+      try {
+        return parse();
+      } catch (PlacementConstraintParseException e) {
+        // unable to parse, simply return null
+        return null;
+      }
+    }
+
+    public abstract AbstractConstraint parse()
+        throws PlacementConstraintParseException;
+  }
+
+  /**
+   * Tokenizer interface that used to parse an expression. It first
+   * validates if the syntax of the given expression is valid, then traverse
+   * the expression and parse it to an enumeration of strings. Each parsed
+   * string can be further consumed by a {@link ConstraintParser} and
+   * transformed to a {@link AbstractConstraint}.
+   */
+  public interface ConstraintTokenizer extends Enumeration<String> {
+
+    /**
+     * Validate the schema before actual parsing the expression.
+     * @throws PlacementConstraintParseException
+     */
+    default void validate() throws PlacementConstraintParseException {
+      // do nothing
+    }
+  }
+
+  /**
+   * A basic tokenizer that splits an expression by a given delimiter.
+   */
+  public static class BaseStringTokenizer implements ConstraintTokenizer {
+    private final StringTokenizer tokenizer;
+    BaseStringTokenizer(String expr, String delimiter) {
+      this.tokenizer = new StringTokenizer(expr, delimiter);
+    }
+
+    @Override
+    public boolean hasMoreElements() {
+      return tokenizer.hasMoreTokens();
+    }
+
+    @Override
+    public String nextElement() {
+      return tokenizer.nextToken();
+    }
+  }
+
+  /**
+   * Tokenizer used to parse conjunction form of a constraint expression,
+   * [AND|OR](C1:C2:...:Cn). Each Cn is a constraint expression.
+   */
+  public static final class ConjunctionTokenizer
+      implements ConstraintTokenizer {
+
+    private final String expression;
+    private Iterator<String> iterator;
+
+    private ConjunctionTokenizer(String expr) {
+      this.expression = expr;
+    }
+
+    // Traverse the expression and try to get a list of parsed elements
+    // based on schema.
+    @Override
+    public void validate() throws PlacementConstraintParseException {
+      List<String> parsedElements = new ArrayList<>();
+      // expression should start with AND or OR
+      String op;
+      if (expression.startsWith(AND) ||
+          expression.startsWith(AND.toUpperCase())) {
+        op = AND;
+      } else if(expression.startsWith(OR) ||
+          expression.startsWith(OR.toUpperCase())) {
+        op = OR;
+      } else {
+        throw new PlacementConstraintParseException(
+            "Excepting starting with \"" + AND + "\" or \"" + OR + "\","
+                + " but met " + expression);
+      }
+      parsedElements.add(op);
+      Pattern p = Pattern.compile("\\((.*)\\)");
+      Matcher m = p.matcher(expression);
+      if (!m.find()) {
+        throw new PlacementConstraintParseException("Unexpected format,"
+            + " expecting [AND|OR](A:B...) "
+            + "but current expression is " + expression);
+      }
+      String childStrs = m.group(1);
+      MultipleConstraintsTokenizer ct =
+          new MultipleConstraintsTokenizer(childStrs);
+      ct.validate();
+      while(ct.hasMoreElements()) {
+        parsedElements.add(ct.nextElement());
+      }
+      this.iterator = parsedElements.iterator();
+    }
+
+    @Override
+    public boolean hasMoreElements() {
+      return iterator.hasNext();
+    }
+
+    @Override
+    public String nextElement() {
+      return iterator.next();
+    }
+  }
+
+  /**
+   * Tokenizer used to parse allocation tags expression, which should be
+   * in tag=numOfAllocations syntax.
+   */
+  public static class SourceTagsTokenizer implements ConstraintTokenizer {
+
+    private final String expression;
+    private StringTokenizer st;
+    private Iterator<String> iterator;
+    public SourceTagsTokenizer(String expr) {
+      this.expression = expr;
+      st = new StringTokenizer(expr, String.valueOf(KV_SPLIT_DELIM));
+    }
+
+    @Override
+    public void validate() throws PlacementConstraintParseException {
+      ArrayList<String> parsedValues = new ArrayList<>();
+      if (st.countTokens() != 2) {
+        throw new PlacementConstraintParseException(
+            "Expecting source allocation tag to be specified"
+                + " sourceTag=numOfAllocations syntax,"
+                + " but met " + expression);
+      }
+
+      String sourceTag = st.nextToken();
+      parsedValues.add(sourceTag);
+      String num = st.nextToken();
+      try {
+        Integer.parseInt(num);
+        parsedValues.add(num);
+      } catch (NumberFormatException e) {
+        throw new PlacementConstraintParseException("Value of the expression"
+            + " must be an integer, but met " + num);
+      }
+      iterator = parsedValues.iterator();
+    }
+
+    @Override
+    public boolean hasMoreElements() {
+      return iterator.hasNext();
+    }
+
+    @Override
+    public String nextElement() {
+      return iterator.next();
+    }
+  }
+
+  /**
+   * Tokenizer used to handle a placement spec composed by multiple
+   * constraint expressions. Each of them is delimited with the
+   * given delimiter, e.g ':'.
+   */
+  public static class MultipleConstraintsTokenizer
+      implements ConstraintTokenizer {
+
+    private final String expr;
+    private Iterator<String> iterator;
+
+    public MultipleConstraintsTokenizer(String expression) {
+      this.expr = expression;
+    }
+
+    @Override
+    public void validate() throws PlacementConstraintParseException {
+      ArrayList<String> parsedElements = new ArrayList<>();
+      char[] arr = expr.toCharArray();
+      // Memorize the location of each delimiter in a stack,
+      // removes invalid delimiters that embraced in brackets.
+      Stack<Integer> stack = new Stack<>();
+      for (int i=0; i<arr.length; i++) {
+        char current = arr[i];
+        switch (current) {
+        case EXPRESSION_DELIM:
+          stack.add(i);
+          break;
+        case BRACKET_START:
+          stack.add(i);
+          break;
+        case BRACKET_END:
+          while(!stack.isEmpty()) {
+            if (arr[stack.pop()] == BRACKET_START) {
+              break;
+            }
+          }
+          break;
+        default:
+          break;
+        }
+      }
+
+      if (stack.isEmpty()) {
+        // Single element
+        parsedElements.add(expr);
+      } else {
+        Iterator<Integer> it = stack.iterator();
+        int currentPos = 0;
+        while (it.hasNext()) {
+          int pos = it.next();
+          String sub = expr.substring(currentPos, pos);
+          if (sub != null && !sub.isEmpty()) {
+            parsedElements.add(sub);
+          }
+          currentPos = pos+1;
+        }
+        if (currentPos < expr.length()) {
+          parsedElements.add(expr.substring(currentPos, expr.length()));
+        }
+      }
+      iterator = parsedElements.iterator();
+    }
+
+    @Override
+    public boolean hasMoreElements() {
+      return iterator.hasNext();
+    }
+
+    @Override
+    public String nextElement() {
+      return iterator.next();
+    }
+  }
+
+  /**
+   * Constraint parser used to parse a given target expression, such as
+   * "NOTIN, NODE, foo, bar".
+   */
+  public static class TargetConstraintParser extends ConstraintParser {
+
+    public TargetConstraintParser(String expression) {
+      super(new BaseStringTokenizer(expression,
+          String.valueOf(EXPRESSION_VAL_DELIM)));
+    }
+
+    @Override
+    public AbstractConstraint parse()
+        throws PlacementConstraintParseException {
+      PlacementConstraint.AbstractConstraint placementConstraints;
+      String op = nextToken();
+      if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN)) {
+        String scope = nextToken();
+        scope = parseScope(scope);
+
+        Set<String> allocationTags = new TreeSet<>();
+        while(hasMoreTokens()) {
+          String tag = nextToken();
+          allocationTags.add(tag);
+        }
+        PlacementConstraint.TargetExpression target =
+            PlacementConstraints.PlacementTargets.allocationTag(
+                allocationTags.toArray(new String[allocationTags.size()]));
+        if (op.equalsIgnoreCase(IN)) {
+          placementConstraints = PlacementConstraints
+              .targetIn(scope, target);
+        } else {
+          placementConstraints = PlacementConstraints
+              .targetNotIn(scope, target);
+        }
+      } else {
+        throw new PlacementConstraintParseException(
+            "expecting " + IN + " or " + NOT_IN + ", but get " + op);
+      }
+      return placementConstraints;
+    }
+  }
+
+  /**
+   * Constraint parser used to parse a given target expression, such as
+   * "cardinality, NODE, foo, 0, 1".
+   */
+  public static class CardinalityConstraintParser extends ConstraintParser {
+
+    public CardinalityConstraintParser(String expr) {
+      super(new BaseStringTokenizer(expr,
+          String.valueOf(EXPRESSION_VAL_DELIM)));
+    }
+
+    @Override
+    public AbstractConstraint parse()
+        throws PlacementConstraintParseException {
+      String op = nextToken();
+      if (!op.equalsIgnoreCase(CARDINALITY)) {
+        throw new PlacementConstraintParseException("expecting " + CARDINALITY
+            + " , but met " + op);
+      }
+
+      shouldHaveNext();
+      String scope = nextToken();
+      scope = parseScope(scope);
+
+      Stack<String> resetElements = new Stack<>();
+      while(hasMoreTokens()) {
+        resetElements.add(nextToken());
+      }
+
+      // At least 3 elements
+      if (resetElements.size() < 3) {
+        throw new PlacementConstraintParseException(
+            "Invalid syntax for a cardinality expression, expecting"
+                + " \"cardinality,SCOPE,TARGET_TAG,...,TARGET_TAG,"
+                + "MIN_CARDINALITY,MAX_CARDINALITY\" at least 5 elements,"
+                + " but only " + (resetElements.size() + 2) + " is given.");
+      }
+
+      String maxCardinalityStr = resetElements.pop();
+      Integer max = toInt(maxCardinalityStr);
+
+      String minCardinalityStr = resetElements.pop();
+      Integer min = toInt(minCardinalityStr);
+
+      ArrayList<String> targetTags = new ArrayList<>();
+      while (!resetElements.empty()) {
+        targetTags.add(resetElements.pop());
+      }
+
+      return PlacementConstraints.cardinality(scope, min, max,
+          targetTags.toArray(new String[targetTags.size()]));
+    }
+  }
+
+  /**
+   * Parser used to parse conjunction form of constraints, such as
+   * AND(A, ..., B), OR(A, ..., B).
+   */
+  public static class ConjunctionConstraintParser extends ConstraintParser {
+
+    public ConjunctionConstraintParser(String expr) {
+      super(new ConjunctionTokenizer(expr));
+    }
+
+    @Override
+    public AbstractConstraint parse() throws PlacementConstraintParseException {
+      // do pre-process, validate input.
+      validate();
+      String op = nextToken();
+      shouldHaveNext();
+      List<AbstractConstraint> constraints = new ArrayList<>();
+      while(hasMoreTokens()) {
+        // each child expression can be any valid form of
+        // constraint expressions.
+        String constraintStr = nextToken();
+        AbstractConstraint constraint = parseExpression(constraintStr);
+        constraints.add(constraint);
+      }
+      if (AND.equalsIgnoreCase(op)) {
+        return PlacementConstraints.and(
+            constraints.toArray(
+                new AbstractConstraint[constraints.size()]));
+      } else if (OR.equalsIgnoreCase(op)) {
+        return PlacementConstraints.or(
+            constraints.toArray(
+                new AbstractConstraint[constraints.size()]));
+      } else {
+        throw new PlacementConstraintParseException(
+            "Unexpected conjunction operator : " + op
+                + ", expecting " + AND + " or " + OR);
+      }
+    }
+  }
+
+  /**
+   * A helper class to encapsulate source tags and allocations in the
+   * placement specification.
+   */
+  public static final class SourceTags {
+    private String tag;
+    private int num;
+
+    private SourceTags(String sourceTag, int number) {
+      this.tag = sourceTag;
+      this.num = number;
+    }
+
+    public String getTag() {
+      return this.tag;
+    }
+
+    public int getNumOfAllocations() {
+      return this.num;
+    }
+
+    /**
+     * Parses source tags from expression "sourceTags=numOfAllocations".
+     * @param expr
+     * @return source tags, see {@link SourceTags}
+     * @throws PlacementConstraintParseException
+     */
+    public static SourceTags parseFrom(String expr)
+        throws PlacementConstraintParseException {
+      SourceTagsTokenizer stt = new SourceTagsTokenizer(expr);
+      stt.validate();
+
+      // During validation we already checked the number of parsed elements.
+      String allocTag = stt.nextElement();
+      int allocNum = Integer.parseInt(stt.nextElement());
+      return new SourceTags(allocTag, allocNum);
+    }
+  }
+
+  /**
+   * Parses a given constraint expression to a {@link AbstractConstraint},
+   * this expression can be any valid form of constraint expressions.
+   *
+   * @param constraintStr expression string
+   * @return a parsed {@link AbstractConstraint}
+   * @throws PlacementConstraintParseException when given expression
+   * is malformed
+   */
+  public static AbstractConstraint parseExpression(String constraintStr)
+      throws PlacementConstraintParseException {
+    // Try parse given expression with all allowed constraint parsers,
+    // fails if no one could parse it.
+    TargetConstraintParser tp = new TargetConstraintParser(constraintStr);
+    Optional<AbstractConstraint> constraintOptional =
+        Optional.ofNullable(tp.tryParse());
+    if (!constraintOptional.isPresent()) {
+      CardinalityConstraintParser cp =
+          new CardinalityConstraintParser(constraintStr);
+      constraintOptional = Optional.ofNullable(cp.tryParse());
+      if (!constraintOptional.isPresent()) {
+        ConjunctionConstraintParser jp =
+            new ConjunctionConstraintParser(constraintStr);
+        constraintOptional = Optional.ofNullable(jp.tryParse());
+      }
+      if (!constraintOptional.isPresent()) {
+        throw new PlacementConstraintParseException(
+            "Invalid constraint expression " + constraintStr);
+      }
+    }
+    return constraintOptional.get();
+  }
+
+  /**
+   * Parses a placement constraint specification. A placement constraint spec
+   * is a composite expression which is composed by multiple sub constraint
+   * expressions delimited by ":". With following syntax:
+   *
+   * <p>Tag1=N1,P1:Tag2=N2,P2:...:TagN=Nn,Pn</p>
+   *
+   * where <b>TagN=Nn</b> is a key value pair to determine the source
+   * allocation tag and the number of allocations, such as:
+   *
+   * <p>foo=3</p>
+   *
+   * and where <b>Pn</b> can be any form of a valid constraint expression,
+   * such as:
+   *
+   * <ul>
+   *   <li>in,node,foo,bar</li>
+   *   <li>notin,node,foo,bar,1,2</li>
+   *   <li>and(notin,node,foo:notin,node,bar)</li>
+   * </ul>
+   * @param expression expression string.
+   * @return a map of source tags to placement constraint mapping.
+   * @throws PlacementConstraintParseException
+   */
+  public static Map<SourceTags, PlacementConstraint> parsePlacementSpec(
+      String expression) throws PlacementConstraintParseException {
+    // Respect insertion order.
+    Map<SourceTags, PlacementConstraint> result = new LinkedHashMap<>();
+    PlacementConstraintParser.ConstraintTokenizer tokenizer =
+        new PlacementConstraintParser.MultipleConstraintsTokenizer(expression);
+    tokenizer.validate();
+    while(tokenizer.hasMoreElements()) {
+      String specStr = tokenizer.nextElement();
+      // each spec starts with sourceAllocationTag=numOfContainers and
+      // followed by a constraint expression.
+      // foo=4,Pn
+      String[] splitted = specStr.split(
+          String.valueOf(EXPRESSION_VAL_DELIM), 2);
+      if (splitted.length != 2) {
+        throw new PlacementConstraintParseException(
+            "Unexpected placement constraint expression " + specStr);
+      }
+
+      String tagAlloc = splitted[0];
+      SourceTags st = SourceTags.parseFrom(tagAlloc);
+      String exprs = splitted[1];
+      AbstractConstraint constraint =
+          PlacementConstraintParser.parseExpression(exprs);
+
+      result.put(st, constraint.build());
+    }
+
+    return result;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08c0488/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/package-info.java
new file mode 100644
index 0000000..890d5ec
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package org.apache.hadoop.yarn.util.constraint contains classes
+ * which is used as utility class for placement constraints.
+ */
+package org.apache.hadoop.yarn.util.constraint;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08c0488/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
new file mode 100644
index 0000000..941f971
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.resource;
+
+import com.google.common.collect.Sets;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTags;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.TargetConstraintParser;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConstraintParser;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.CardinalityConstraintParser;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConjunctionConstraintParser;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.MultipleConstraintsTokenizer;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTagsTokenizer;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConstraintTokenizer;
+
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.*;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Class to test placement constraint parser.
+ */
+public class TestPlacementConstraintParser {
+
+  @Test
+  public void testTargetExpressionParser()
+      throws PlacementConstraintParseException {
+    ConstraintParser parser;
+    AbstractConstraint constraint;
+    SingleConstraint single;
+
+    // Anti-affinity with single target tag
+    // NOTIN,NDOE,foo
+    parser = new TargetConstraintParser("NOTIN, NODE, foo");
+    constraint = parser.parse();
+    Assert.assertTrue(constraint instanceof SingleConstraint);
+    single = (SingleConstraint) constraint;
+    Assert.assertEquals("node", single.getScope());
+    Assert.assertEquals(0, single.getMinCardinality());
+    Assert.assertEquals(0, single.getMaxCardinality());
+
+    // lower cases is also valid
+    parser = new TargetConstraintParser("notin, node, foo");
+    constraint = parser.parse();
+    Assert.assertTrue(constraint instanceof SingleConstraint);
+    single = (SingleConstraint) constraint;
+    Assert.assertEquals("node", single.getScope());
+    Assert.assertEquals(0, single.getMinCardinality());
+    Assert.assertEquals(0, single.getMaxCardinality());
+
+    // Affinity with single target tag
+    // IN,NODE,foo
+    parser = new TargetConstraintParser("IN, NODE, foo");
+    constraint = parser.parse();
+    Assert.assertTrue(constraint instanceof SingleConstraint);
+    single = (SingleConstraint) constraint;
+    Assert.assertEquals("node", single.getScope());
+    Assert.assertEquals(1, single.getMinCardinality());
+    Assert.assertEquals(Integer.MAX_VALUE, single.getMaxCardinality());
+
+    // Anti-affinity with multiple target tags
+    // NOTIN,NDOE,foo,bar,exp
+    parser = new TargetConstraintParser("NOTIN, NODE, foo, bar, exp");
+    constraint = parser.parse();
+    Assert.assertTrue(constraint instanceof SingleConstraint);
+    single = (SingleConstraint) constraint;
+    Assert.assertEquals("node", single.getScope());
+    Assert.assertEquals(0, single.getMinCardinality());
+    Assert.assertEquals(0, single.getMaxCardinality());
+    Assert.assertEquals(1, single.getTargetExpressions().size());
+    TargetExpression exp =
+        single.getTargetExpressions().iterator().next();
+    Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString());
+    Assert.assertEquals(3, exp.getTargetValues().size());
+
+    // Invalid OP
+    parser = new TargetConstraintParser("XYZ, NODE, foo");
+    try {
+      parser.parse();
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof PlacementConstraintParseException);
+      Assert.assertTrue(e.getMessage().contains("expecting in or notin"));
+    }
+  }
+
+  @Test
+  public void testCardinalityConstraintParser()
+      throws PlacementConstraintParseException {
+    ConstraintParser parser;
+    AbstractConstraint constraint;
+    SingleConstraint single;
+
+    // cardinality,NODE,foo,0,1
+    parser = new CardinalityConstraintParser("cardinality, NODE, foo, 0, 1");
+    constraint = parser.parse();
+    Assert.assertTrue(constraint instanceof SingleConstraint);
+    single = (SingleConstraint) constraint;
+    Assert.assertEquals("node", single.getScope());
+    Assert.assertEquals(0, single.getMinCardinality());
+    Assert.assertEquals(1, single.getMaxCardinality());
+    Assert.assertEquals(1, single.getTargetExpressions().size());
+    TargetExpression exp =
+        single.getTargetExpressions().iterator().next();
+    Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString());
+    Assert.assertEquals(1, exp.getTargetValues().size());
+    Assert.assertEquals("foo", exp.getTargetValues().iterator().next());
+
+    // cardinality,NODE,foo,bar,moo,0,1
+    parser = new CardinalityConstraintParser(
+        "cardinality,RACK,foo,bar,moo,0,1");
+    constraint = parser.parse();
+    Assert.assertTrue(constraint instanceof SingleConstraint);
+    single = (SingleConstraint) constraint;
+    Assert.assertEquals("rack", single.getScope());
+    Assert.assertEquals(0, single.getMinCardinality());
+    Assert.assertEquals(1, single.getMaxCardinality());
+    Assert.assertEquals(1, single.getTargetExpressions().size());
+    exp = single.getTargetExpressions().iterator().next();
+    Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString());
+    Assert.assertEquals(3, exp.getTargetValues().size());
+    Set<String> expectedTags = Sets.newHashSet("foo", "bar", "moo");
+    Assert.assertTrue(Sets.difference(expectedTags, exp.getTargetValues())
+        .isEmpty());
+
+    // Invalid scope string
+    try {
+      parser = new CardinalityConstraintParser(
+          "cardinality,NOWHERE,foo,bar,moo,0,1");
+      parser.parse();
+      Assert.fail("Expecting a parsing failure!");
+    } catch (PlacementConstraintParseException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("expecting scope to node or rack, but met NOWHERE"));
+    }
+
+    // Invalid number of expression elements
+    try {
+      parser = new CardinalityConstraintParser(
+          "cardinality,NODE,0,1");
+      parser.parse();
+      Assert.fail("Expecting a parsing failure!");
+    } catch (PlacementConstraintParseException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("at least 5 elements, but only 4 is given"));
+    }
+  }
+
+  @Test
+  public void testAndConstraintParser()
+      throws PlacementConstraintParseException {
+    ConstraintParser parser;
+    AbstractConstraint constraint;
+    And and;
+
+    parser = new ConjunctionConstraintParser(
+        "AND(NOTIN,NODE,foo:NOTIN,NODE,bar)");
+    constraint = parser.parse();
+    Assert.assertTrue(constraint instanceof And);
+    and = (And) constraint;
+    Assert.assertEquals(2, and.getChildren().size());
+
+    parser = new ConjunctionConstraintParser(
+        "AND(NOTIN,NODE,foo:cardinality,NODE,foo,0,1)");
+    constraint = parser.parse();
+    Assert.assertTrue(constraint instanceof And);
+    Assert.assertEquals(2, and.getChildren().size());
+
+    parser = new ConjunctionConstraintParser(
+        "AND(NOTIN,NODE,foo:AND(NOTIN,NODE,foo:cardinality,NODE,foo,0,1))");
+    constraint = parser.parse();
+    Assert.assertTrue(constraint instanceof And);
+    and = (And) constraint;
+    Assert.assertTrue(and.getChildren().get(0) instanceof SingleConstraint);
+    Assert.assertTrue(and.getChildren().get(1) instanceof And);
+    and = (And) and.getChildren().get(1);
+    Assert.assertEquals(2, and.getChildren().size());
+  }
+
+  @Test
+  public void testMultipleConstraintsTokenizer()
+      throws PlacementConstraintParseException {
+    MultipleConstraintsTokenizer ct;
+    SourceTagsTokenizer st;
+    TokenizerTester mp;
+
+    ct = new MultipleConstraintsTokenizer(
+        "foo=1,A1,A2,A3:bar=2,B1,B2:moo=3,C1,C2");
+    mp = new TokenizerTester(ct,
+        "foo=1,A1,A2,A3", "bar=2,B1,B2", "moo=3,C1,C2");
+    mp.verify();
+
+    ct = new MultipleConstraintsTokenizer(
+        "foo=1,AND(A2:A3):bar=2,OR(B1:AND(B2:B3)):moo=3,C1,C2");
+    mp = new TokenizerTester(ct,
+        "foo=1,AND(A2:A3)", "bar=2,OR(B1:AND(B2:B3))", "moo=3,C1,C2");
+    mp.verify();
+
+    ct = new MultipleConstraintsTokenizer("A:B:C");
+    mp = new TokenizerTester(ct, "A", "B", "C");
+    mp.verify();
+
+    ct = new MultipleConstraintsTokenizer("A:AND(B:C):D");
+    mp = new TokenizerTester(ct, "A", "AND(B:C)", "D");
+    mp.verify();
+
+    ct = new MultipleConstraintsTokenizer("A:AND(B:OR(C:D)):E");
+    mp = new TokenizerTester(ct, "A", "AND(B:OR(C:D))", "E");
+    mp.verify();
+
+    ct = new MultipleConstraintsTokenizer("A:AND(B:OR(C:D)):E");
+    mp = new TokenizerTester(ct, "A", "AND(B:OR(C:D))", "E");
+    mp.verify();
+
+    st = new SourceTagsTokenizer("A=4");
+    mp = new TokenizerTester(st, "A", "4");
+    mp.verify();
+
+    try {
+      st = new SourceTagsTokenizer("A=B");
+      mp = new TokenizerTester(st, "A", "B");
+      mp.verify();
+      Assert.fail("Expecting a parsing failure");
+    } catch (PlacementConstraintParseException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("Value of the expression must be an integer"));
+    }
+  }
+
+  private static class TokenizerTester {
+
+    private ConstraintTokenizer tokenizer;
+    private String[] expectedExtractions;
+
+    protected TokenizerTester(ConstraintTokenizer tk,
+        String... expctedStrings) {
+      this.tokenizer = tk;
+      this.expectedExtractions = expctedStrings;
+    }
+
+    void verify()
+        throws PlacementConstraintParseException {
+      tokenizer.validate();
+      int i = 0;
+      while (tokenizer.hasMoreElements()) {
+        String current = tokenizer.nextElement();
+        Assert.assertTrue(i < expectedExtractions.length);
+        Assert.assertEquals(expectedExtractions[i], current);
+        i++;
+      }
+    }
+  }
+
+  @Test
+  public void testParsePlacementSpec()
+      throws PlacementConstraintParseException {
+    Map<SourceTags, PlacementConstraint> result;
+    PlacementConstraint expectedPc1, expectedPc2;
+    PlacementConstraint actualPc1, actualPc2;
+    SourceTags tag1, tag2;
+
+    // A single anti-affinity constraint
+    result = PlacementConstraintParser
+        .parsePlacementSpec("foo=3,notin,node,foo");
+    Assert.assertEquals(1, result.size());
+    tag1 = result.keySet().iterator().next();
+    Assert.assertEquals("foo", tag1.getTag());
+    Assert.assertEquals(3, tag1.getNumOfAllocations());
+    expectedPc1 = targetNotIn("node", allocationTag("foo")).build();
+    actualPc1 = result.values().iterator().next();
+    Assert.assertEquals(expectedPc1, actualPc1);
+
+    // Upper case
+    result = PlacementConstraintParser
+        .parsePlacementSpec("foo=3,NOTIN,NODE,foo");
+    Assert.assertEquals(1, result.size());
+    tag1 = result.keySet().iterator().next();
+    Assert.assertEquals("foo", tag1.getTag());
+    Assert.assertEquals(3, tag1.getNumOfAllocations());
+    expectedPc1 = targetNotIn("node", allocationTag("foo")).build();
+    actualPc1 = result.values().iterator().next();
+    Assert.assertEquals(expectedPc1, actualPc1);
+
+    // A single cardinality constraint
+    result = PlacementConstraintParser
+        .parsePlacementSpec("foo=10,cardinality,node,foo,bar,0,100");
+    Assert.assertEquals(1, result.size());
+    tag1 = result.keySet().iterator().next();
+    Assert.assertEquals("foo", tag1.getTag());
+    Assert.assertEquals(10, tag1.getNumOfAllocations());
+    expectedPc1 = cardinality("node", 0, 100, "foo", "bar").build();
+    Assert.assertEquals(expectedPc1, result.values().iterator().next());
+
+    // Two constraint expressions
+    result = PlacementConstraintParser
+        .parsePlacementSpec("foo=3,notin,node,foo:bar=2,in,node,foo");
+    Assert.assertEquals(2, result.size());
+    Iterator<SourceTags> keyIt = result.keySet().iterator();
+    tag1 = keyIt.next();
+    Assert.assertEquals("foo", tag1.getTag());
+    Assert.assertEquals(3, tag1.getNumOfAllocations());
+    tag2 = keyIt.next();
+    Assert.assertEquals("bar", tag2.getTag());
+    Assert.assertEquals(2, tag2.getNumOfAllocations());
+    Iterator<PlacementConstraint> valueIt = result.values().iterator();
+    expectedPc1 = targetNotIn("node", allocationTag("foo")).build();
+    expectedPc2 = targetIn("node", allocationTag("foo")).build();
+    Assert.assertEquals(expectedPc1, valueIt.next());
+    Assert.assertEquals(expectedPc2, valueIt.next());
+
+    // And constraint
+    result = PlacementConstraintParser
+        .parsePlacementSpec("foo=1000,and(notin,node,bar:in,node,foo)");
+    Assert.assertEquals(1, result.size());
+    keyIt = result.keySet().iterator();
+    tag1 = keyIt.next();
+    Assert.assertEquals("foo", tag1.getTag());
+    Assert.assertEquals(1000, tag1.getNumOfAllocations());
+    actualPc1 = result.values().iterator().next();
+    expectedPc1 = and(targetNotIn("node", allocationTag("bar")),
+        targetIn("node", allocationTag("foo"))).build();
+    Assert.assertEquals(expectedPc1, actualPc1);
+
+    // Multiple constraints with nested forms.
+    result = PlacementConstraintParser.parsePlacementSpec(
+            "foo=1000,and(notin,node,bar:or(in,node,foo:in,node,moo))"
+                + ":bar=200,notin,node,foo");
+    Assert.assertEquals(2, result.size());
+    keyIt = result.keySet().iterator();
+    tag1 = keyIt.next();
+    tag2 = keyIt.next();
+    Assert.assertEquals("foo", tag1.getTag());
+    Assert.assertEquals(1000, tag1.getNumOfAllocations());
+    Assert.assertEquals("bar", tag2.getTag());
+    Assert.assertEquals(200, tag2.getNumOfAllocations());
+    valueIt = result.values().iterator();
+    actualPc1 = valueIt.next();
+    actualPc2 = valueIt.next();
+
+    expectedPc1 = and(targetNotIn("node", allocationTag("bar")),
+        or(targetIn("node", allocationTag("foo")),
+            targetIn("node", allocationTag("moo")))).build();
+    Assert.assertEquals(actualPc1, expectedPc1);
+    expectedPc2 = targetNotIn("node", allocationTag("foo")).build();
+    Assert.assertEquals(expectedPc2, actualPc2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08c0488/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 9ba2138..a06ee7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -28,6 +28,7 @@ import java.lang.reflect.UndeclaredThrowableException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -38,6 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Vector;
+import java.util.Base64;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -671,8 +673,14 @@ public class ApplicationMaster {
   }
 
   private void parsePlacementSpecs(String placementSpecifications) {
+    // Client sends placement spec in encoded format
+    Base64.Decoder decoder = Base64.getDecoder();
+    byte[] decodedBytes = decoder.decode(
+        placementSpecifications.getBytes(StandardCharsets.UTF_8));
+    String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8);
+    LOG.info("Decode placement spec: " + decodedSpec);
     Map<String, PlacementSpec> pSpecs =
-        PlacementSpec.parse(placementSpecifications);
+        PlacementSpec.parse(decodedSpec);
     this.placementSpecs = new HashMap<>();
     this.numTotalContainers = 0;
     for (PlacementSpec pSpec : pSpecs.values()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08c0488/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 0aef83f..ac58662 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.applications.distributedshell;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -28,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Vector;
 import java.util.Arrays;
+import java.util.Base64;
 
 import com.google.common.base.Joiner;
 import org.apache.commons.cli.CommandLine;
@@ -857,7 +859,11 @@ public class Client {
     }
     vargs.add("--num_containers " + String.valueOf(numContainers));
     if (placementSpec != null && placementSpec.length() > 0) {
-      vargs.add("--placement_spec " + placementSpec);
+      // Encode the spec to avoid passing special chars via shell arguments.
+      String encodedSpec = Base64.getEncoder()
+          .encodeToString(placementSpec.getBytes(StandardCharsets.UTF_8));
+      LOG.info("Encode placement spec: " + encodedSpec);
+      vargs.add("--placement_spec " + encodedSpec);
     }
     if (null != nodeLabelExpression) {
       appContext.setNodeLabelExpression(nodeLabelExpression);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08c0488/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
index ed13ee0..2909259 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
@@ -18,13 +18,14 @@
 package org.apache.hadoop.yarn.applications.distributedshell;
 
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTags;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Scanner;
 
 /**
  * Class encapsulating a SourceTag, number of container and a Placement
@@ -34,12 +35,6 @@ public class PlacementSpec {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(PlacementSpec.class);
-  private static final String SPEC_DELIM = ":";
-  private static final String KV_SPLIT_DELIM = "=";
-  private static final String SPEC_VAL_DELIM = ",";
-  private static final String IN = "in";
-  private static final String NOT_IN = "notin";
-  private static final String CARDINALITY = "cardinality";
 
   public final String sourceTag;
   public final int numContainers;
@@ -73,65 +68,28 @@ public class PlacementSpec {
    * @param specs Placement spec.
    * @return Mapping from source tag to placement constraint.
    */
-  public static Map<String, PlacementSpec> parse(String specs) {
+  public static Map<String, PlacementSpec> parse(String specs)
+      throws IllegalArgumentException {
     LOG.info("Parsing Placement Specs: [{}]", specs);
-    Scanner s = new Scanner(specs).useDelimiter(SPEC_DELIM);
     Map<String, PlacementSpec> pSpecs = new HashMap<>();
-    while (s.hasNext()) {
-      String sp = s.next();
-      LOG.info("Parsing Spec: [{}]", sp);
-      String[] specSplit = sp.split(KV_SPLIT_DELIM);
-      String sourceTag = specSplit[0];
-      Scanner ps = new Scanner(specSplit[1]).useDelimiter(SPEC_VAL_DELIM);
-      int numContainers = ps.nextInt();
-      if (!ps.hasNext()) {
-        pSpecs.put(sourceTag,
-            new PlacementSpec(sourceTag, numContainers, null));
-        LOG.info("Creating Spec without constraint {}: num[{}]",
-            sourceTag, numContainers);
-        continue;
+    Map<SourceTags, PlacementConstraint> parsed;
+    try {
+      parsed = PlacementConstraintParser.parsePlacementSpec(specs);
+      for (Map.Entry<SourceTags, PlacementConstraint> entry :
+          parsed.entrySet()) {
+        LOG.info("Parsed source tag: {}, number of allocations: {}",
+            entry.getKey().getTag(), entry.getKey().getNumOfAllocations());
+        LOG.info("Parsed constraint: {}", entry.getValue()
+            .getConstraintExpr().getClass().getSimpleName());
+        pSpecs.put(entry.getKey().getTag(), new PlacementSpec(
+            entry.getKey().getTag(),
+            entry.getKey().getNumOfAllocations(),
+            entry.getValue()));
       }
-      String cType = ps.next().toLowerCase();
-      String scope = ps.next().toLowerCase();
-
-      String targetTag = ps.next();
-      scope = scope.equals("rack") ? PlacementConstraints.RACK :
-          PlacementConstraints.NODE;
-
-      PlacementConstraint pc;
-      if (cType.equals(IN)) {
-        pc = PlacementConstraints.build(
-            PlacementConstraints.targetIn(scope,
-                PlacementConstraints.PlacementTargets.allocationTag(
-                    targetTag)));
-        LOG.info("Creating IN Constraint for source tag [{}], num[{}]: " +
-                "scope[{}], target[{}]",
-            sourceTag, numContainers, scope, targetTag);
-      } else if (cType.equals(NOT_IN)) {
-        pc = PlacementConstraints.build(
-            PlacementConstraints.targetNotIn(scope,
-                PlacementConstraints.PlacementTargets.allocationTag(
-                    targetTag)));
-        LOG.info("Creating NOT_IN Constraint for source tag [{}], num[{}]: " +
-                "scope[{}], target[{}]",
-            sourceTag, numContainers, scope, targetTag);
-      } else if (cType.equals(CARDINALITY)) {
-        int minCard = ps.nextInt();
-        int maxCard = ps.nextInt();
-        pc = PlacementConstraints.build(
-            PlacementConstraints.targetCardinality(scope, minCard, maxCard,
-                PlacementConstraints.PlacementTargets.allocationTag(
-                    targetTag)));
-        LOG.info("Creating CARDINALITY Constraint source tag [{}], num[{}]: " +
-                "scope[{}], min[{}], max[{}], target[{}]",
-            sourceTag, numContainers, scope, minCard, maxCard, targetTag);
-      } else {
-        throw new RuntimeException(
-            "Could not parse constraintType [" + cType + "]" +
-                " in [" + specSplit[1] + "]");
-      }
-      pSpecs.put(sourceTag, new PlacementSpec(sourceTag, numContainers, pc));
+      return pSpecs;
+    } catch (PlacementConstraintParseException e) {
+      throw new IllegalArgumentException(
+          "Invalid placement spec: " + specs, e);
     }
-    return pSpecs;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org