You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2018/02/11 06:21:24 UTC
hadoop git commit: YARN-7838. Support AND/OR constraints in
Distributed Shell. Contributed by Weiwei Yang.
Repository: hadoop
Updated Branches:
refs/heads/trunk 25fbec67d -> a08c04883
YARN-7838. Support AND/OR constraints in Distributed Shell. Contributed by Weiwei Yang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a08c0488
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a08c0488
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a08c0488
Branch: refs/heads/trunk
Commit: a08c048832d68c203fbdfce8d9f0e7dcccb02a55
Parents: 25fbec6
Author: Weiwei Yang <ww...@apache.org>
Authored: Sun Feb 11 14:20:46 2018 +0800
Committer: Weiwei Yang <ww...@apache.org>
Committed: Sun Feb 11 14:20:46 2018 +0800
----------------------------------------------------------------------
.../PlacementConstraintParseException.java | 28 +
.../constraint/PlacementConstraintParser.java | 615 +++++++++++++++++++
.../yarn/util/constraint/package-info.java | 22 +
.../resource/TestPlacementConstraintParser.java | 372 +++++++++++
.../distributedshell/ApplicationMaster.java | 10 +-
.../applications/distributedshell/Client.java | 8 +-
.../distributedshell/PlacementSpec.java | 86 +--
7 files changed, 1075 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08c0488/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParseException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParseException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParseException.java
new file mode 100644
index 0000000..8f3e28c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParseException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.util.constraint;
+
+/**
+ * Exception when the placement constraint parser fails to parse an expression.
+ */
+public class PlacementConstraintParseException extends Exception {
+
+ public PlacementConstraintParseException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08c0488/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
new file mode 100644
index 0000000..603e692
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
@@ -0,0 +1,615 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.util.constraint;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.StringTokenizer;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Stack;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Placement constraint expression parser.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class PlacementConstraintParser {
+
+ private static final char EXPRESSION_DELIM = ':';
+ private static final char KV_SPLIT_DELIM = '=';
+ private static final char EXPRESSION_VAL_DELIM = ',';
+ private static final char BRACKET_START = '(';
+ private static final char BRACKET_END = ')';
+ private static final String IN = "in";
+ private static final String NOT_IN = "notin";
+ private static final String AND = "and";
+ private static final String OR = "or";
+ private static final String CARDINALITY = "cardinality";
+ private static final String SCOPE_NODE = PlacementConstraints.NODE;
+ private static final String SCOPE_RACK = PlacementConstraints.RACK;
+
+ private PlacementConstraintParser() {
+ // Private constructor for this utility class.
+ }
+
+ /**
+ * Constraint Parser used to parse placement constraints from a
+ * given expression.
+ */
+ public static abstract class ConstraintParser {
+
+ private final ConstraintTokenizer tokenizer;
+
+ public ConstraintParser(ConstraintTokenizer tk){
+ this.tokenizer = tk;
+ }
+
+ void validate() throws PlacementConstraintParseException {
+ tokenizer.validate();
+ }
+
+ void shouldHaveNext()
+ throws PlacementConstraintParseException {
+ if (!tokenizer.hasMoreElements()) {
+ throw new PlacementConstraintParseException("Expecting more tokens");
+ }
+ }
+
+ String nextToken() {
+ return this.tokenizer.nextElement().trim();
+ }
+
+ boolean hasMoreTokens() {
+ return this.tokenizer.hasMoreElements();
+ }
+
+ int toInt(String name) throws PlacementConstraintParseException {
+ try {
+ return Integer.parseInt(name);
+ } catch (NumberFormatException e) {
+ throw new PlacementConstraintParseException(
+ "Expecting an Integer, but get " + name);
+ }
+ }
+
+ String parseScope(String scopeString)
+ throws PlacementConstraintParseException {
+ if (scopeString.equalsIgnoreCase(SCOPE_NODE)) {
+ return SCOPE_NODE;
+ } else if (scopeString.equalsIgnoreCase(SCOPE_RACK)) {
+ return SCOPE_RACK;
+ } else {
+ throw new PlacementConstraintParseException(
+ "expecting scope to " + SCOPE_NODE + " or " + SCOPE_RACK
+ + ", but met " + scopeString);
+ }
+ }
+
+ public AbstractConstraint tryParse() {
+ try {
+ return parse();
+ } catch (PlacementConstraintParseException e) {
+ // unable to parse, simply return null
+ return null;
+ }
+ }
+
+ public abstract AbstractConstraint parse()
+ throws PlacementConstraintParseException;
+ }
+
+ /**
+ * Tokenizer interface that used to parse an expression. It first
+ * validates if the syntax of the given expression is valid, then traverse
+ * the expression and parse it to an enumeration of strings. Each parsed
+ * string can be further consumed by a {@link ConstraintParser} and
+ * transformed to a {@link AbstractConstraint}.
+ */
+ public interface ConstraintTokenizer extends Enumeration<String> {
+
+ /**
+ * Validate the schema before actual parsing the expression.
+ * @throws PlacementConstraintParseException
+ */
+ default void validate() throws PlacementConstraintParseException {
+ // do nothing
+ }
+ }
+
+ /**
+ * A basic tokenizer that splits an expression by a given delimiter.
+ */
+ public static class BaseStringTokenizer implements ConstraintTokenizer {
+ private final StringTokenizer tokenizer;
+ BaseStringTokenizer(String expr, String delimiter) {
+ this.tokenizer = new StringTokenizer(expr, delimiter);
+ }
+
+ @Override
+ public boolean hasMoreElements() {
+ return tokenizer.hasMoreTokens();
+ }
+
+ @Override
+ public String nextElement() {
+ return tokenizer.nextToken();
+ }
+ }
+
+ /**
+ * Tokenizer used to parse conjunction form of a constraint expression,
+ * [AND|OR](C1:C2:...:Cn). Each Cn is a constraint expression.
+ */
+ public static final class ConjunctionTokenizer
+ implements ConstraintTokenizer {
+
+ private final String expression;
+ private Iterator<String> iterator;
+
+ private ConjunctionTokenizer(String expr) {
+ this.expression = expr;
+ }
+
+ // Traverse the expression and try to get a list of parsed elements
+ // based on schema.
+ @Override
+ public void validate() throws PlacementConstraintParseException {
+ List<String> parsedElements = new ArrayList<>();
+ // expression should start with AND or OR
+ String op;
+ if (expression.startsWith(AND) ||
+ expression.startsWith(AND.toUpperCase())) {
+ op = AND;
+ } else if(expression.startsWith(OR) ||
+ expression.startsWith(OR.toUpperCase())) {
+ op = OR;
+ } else {
+ throw new PlacementConstraintParseException(
+ "Excepting starting with \"" + AND + "\" or \"" + OR + "\","
+ + " but met " + expression);
+ }
+ parsedElements.add(op);
+ Pattern p = Pattern.compile("\\((.*)\\)");
+ Matcher m = p.matcher(expression);
+ if (!m.find()) {
+ throw new PlacementConstraintParseException("Unexpected format,"
+ + " expecting [AND|OR](A:B...) "
+ + "but current expression is " + expression);
+ }
+ String childStrs = m.group(1);
+ MultipleConstraintsTokenizer ct =
+ new MultipleConstraintsTokenizer(childStrs);
+ ct.validate();
+ while(ct.hasMoreElements()) {
+ parsedElements.add(ct.nextElement());
+ }
+ this.iterator = parsedElements.iterator();
+ }
+
+ @Override
+ public boolean hasMoreElements() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public String nextElement() {
+ return iterator.next();
+ }
+ }
+
+ /**
+ * Tokenizer used to parse allocation tags expression, which should be
+ * in tag=numOfAllocations syntax.
+ */
+ public static class SourceTagsTokenizer implements ConstraintTokenizer {
+
+ private final String expression;
+ private StringTokenizer st;
+ private Iterator<String> iterator;
+ public SourceTagsTokenizer(String expr) {
+ this.expression = expr;
+ st = new StringTokenizer(expr, String.valueOf(KV_SPLIT_DELIM));
+ }
+
+ @Override
+ public void validate() throws PlacementConstraintParseException {
+ ArrayList<String> parsedValues = new ArrayList<>();
+ if (st.countTokens() != 2) {
+ throw new PlacementConstraintParseException(
+ "Expecting source allocation tag to be specified"
+ + " sourceTag=numOfAllocations syntax,"
+ + " but met " + expression);
+ }
+
+ String sourceTag = st.nextToken();
+ parsedValues.add(sourceTag);
+ String num = st.nextToken();
+ try {
+ Integer.parseInt(num);
+ parsedValues.add(num);
+ } catch (NumberFormatException e) {
+ throw new PlacementConstraintParseException("Value of the expression"
+ + " must be an integer, but met " + num);
+ }
+ iterator = parsedValues.iterator();
+ }
+
+ @Override
+ public boolean hasMoreElements() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public String nextElement() {
+ return iterator.next();
+ }
+ }
+
+ /**
+ * Tokenizer used to handle a placement spec composed by multiple
+ * constraint expressions. Each of them is delimited with the
+ * given delimiter, e.g ':'.
+ */
+ public static class MultipleConstraintsTokenizer
+ implements ConstraintTokenizer {
+
+ private final String expr;
+ private Iterator<String> iterator;
+
+ public MultipleConstraintsTokenizer(String expression) {
+ this.expr = expression;
+ }
+
+ @Override
+ public void validate() throws PlacementConstraintParseException {
+ ArrayList<String> parsedElements = new ArrayList<>();
+ char[] arr = expr.toCharArray();
+ // Memorize the location of each delimiter in a stack,
+ // removes invalid delimiters that embraced in brackets.
+ Stack<Integer> stack = new Stack<>();
+ for (int i=0; i<arr.length; i++) {
+ char current = arr[i];
+ switch (current) {
+ case EXPRESSION_DELIM:
+ stack.add(i);
+ break;
+ case BRACKET_START:
+ stack.add(i);
+ break;
+ case BRACKET_END:
+ while(!stack.isEmpty()) {
+ if (arr[stack.pop()] == BRACKET_START) {
+ break;
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ if (stack.isEmpty()) {
+ // Single element
+ parsedElements.add(expr);
+ } else {
+ Iterator<Integer> it = stack.iterator();
+ int currentPos = 0;
+ while (it.hasNext()) {
+ int pos = it.next();
+ String sub = expr.substring(currentPos, pos);
+ if (sub != null && !sub.isEmpty()) {
+ parsedElements.add(sub);
+ }
+ currentPos = pos+1;
+ }
+ if (currentPos < expr.length()) {
+ parsedElements.add(expr.substring(currentPos, expr.length()));
+ }
+ }
+ iterator = parsedElements.iterator();
+ }
+
+ @Override
+ public boolean hasMoreElements() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public String nextElement() {
+ return iterator.next();
+ }
+ }
+
+ /**
+ * Constraint parser used to parse a given target expression, such as
+ * "NOTIN, NODE, foo, bar".
+ */
+ public static class TargetConstraintParser extends ConstraintParser {
+
+ public TargetConstraintParser(String expression) {
+ super(new BaseStringTokenizer(expression,
+ String.valueOf(EXPRESSION_VAL_DELIM)));
+ }
+
+ @Override
+ public AbstractConstraint parse()
+ throws PlacementConstraintParseException {
+ PlacementConstraint.AbstractConstraint placementConstraints;
+ String op = nextToken();
+ if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN)) {
+ String scope = nextToken();
+ scope = parseScope(scope);
+
+ Set<String> allocationTags = new TreeSet<>();
+ while(hasMoreTokens()) {
+ String tag = nextToken();
+ allocationTags.add(tag);
+ }
+ PlacementConstraint.TargetExpression target =
+ PlacementConstraints.PlacementTargets.allocationTag(
+ allocationTags.toArray(new String[allocationTags.size()]));
+ if (op.equalsIgnoreCase(IN)) {
+ placementConstraints = PlacementConstraints
+ .targetIn(scope, target);
+ } else {
+ placementConstraints = PlacementConstraints
+ .targetNotIn(scope, target);
+ }
+ } else {
+ throw new PlacementConstraintParseException(
+ "expecting " + IN + " or " + NOT_IN + ", but get " + op);
+ }
+ return placementConstraints;
+ }
+ }
+
+ /**
+ * Constraint parser used to parse a given target expression, such as
+ * "cardinality, NODE, foo, 0, 1".
+ */
+ public static class CardinalityConstraintParser extends ConstraintParser {
+
+ public CardinalityConstraintParser(String expr) {
+ super(new BaseStringTokenizer(expr,
+ String.valueOf(EXPRESSION_VAL_DELIM)));
+ }
+
+ @Override
+ public AbstractConstraint parse()
+ throws PlacementConstraintParseException {
+ String op = nextToken();
+ if (!op.equalsIgnoreCase(CARDINALITY)) {
+ throw new PlacementConstraintParseException("expecting " + CARDINALITY
+ + " , but met " + op);
+ }
+
+ shouldHaveNext();
+ String scope = nextToken();
+ scope = parseScope(scope);
+
+ Stack<String> resetElements = new Stack<>();
+ while(hasMoreTokens()) {
+ resetElements.add(nextToken());
+ }
+
+ // At least 3 elements
+ if (resetElements.size() < 3) {
+ throw new PlacementConstraintParseException(
+ "Invalid syntax for a cardinality expression, expecting"
+ + " \"cardinality,SCOPE,TARGET_TAG,...,TARGET_TAG,"
+ + "MIN_CARDINALITY,MAX_CARDINALITY\" at least 5 elements,"
+ + " but only " + (resetElements.size() + 2) + " is given.");
+ }
+
+ String maxCardinalityStr = resetElements.pop();
+ Integer max = toInt(maxCardinalityStr);
+
+ String minCardinalityStr = resetElements.pop();
+ Integer min = toInt(minCardinalityStr);
+
+ ArrayList<String> targetTags = new ArrayList<>();
+ while (!resetElements.empty()) {
+ targetTags.add(resetElements.pop());
+ }
+
+ return PlacementConstraints.cardinality(scope, min, max,
+ targetTags.toArray(new String[targetTags.size()]));
+ }
+ }
+
+ /**
+ * Parser used to parse conjunction form of constraints, such as
+ * AND(A, ..., B), OR(A, ..., B).
+ */
+ public static class ConjunctionConstraintParser extends ConstraintParser {
+
+ public ConjunctionConstraintParser(String expr) {
+ super(new ConjunctionTokenizer(expr));
+ }
+
+ @Override
+ public AbstractConstraint parse() throws PlacementConstraintParseException {
+ // do pre-process, validate input.
+ validate();
+ String op = nextToken();
+ shouldHaveNext();
+ List<AbstractConstraint> constraints = new ArrayList<>();
+ while(hasMoreTokens()) {
+ // each child expression can be any valid form of
+ // constraint expressions.
+ String constraintStr = nextToken();
+ AbstractConstraint constraint = parseExpression(constraintStr);
+ constraints.add(constraint);
+ }
+ if (AND.equalsIgnoreCase(op)) {
+ return PlacementConstraints.and(
+ constraints.toArray(
+ new AbstractConstraint[constraints.size()]));
+ } else if (OR.equalsIgnoreCase(op)) {
+ return PlacementConstraints.or(
+ constraints.toArray(
+ new AbstractConstraint[constraints.size()]));
+ } else {
+ throw new PlacementConstraintParseException(
+ "Unexpected conjunction operator : " + op
+ + ", expecting " + AND + " or " + OR);
+ }
+ }
+ }
+
+ /**
+ * A helper class to encapsulate source tags and allocations in the
+ * placement specification.
+ */
+ public static final class SourceTags {
+ private String tag;
+ private int num;
+
+ private SourceTags(String sourceTag, int number) {
+ this.tag = sourceTag;
+ this.num = number;
+ }
+
+ public String getTag() {
+ return this.tag;
+ }
+
+ public int getNumOfAllocations() {
+ return this.num;
+ }
+
+ /**
+ * Parses source tags from expression "sourceTags=numOfAllocations".
+ * @param expr
+ * @return source tags, see {@link SourceTags}
+ * @throws PlacementConstraintParseException
+ */
+ public static SourceTags parseFrom(String expr)
+ throws PlacementConstraintParseException {
+ SourceTagsTokenizer stt = new SourceTagsTokenizer(expr);
+ stt.validate();
+
+ // During validation we already checked the number of parsed elements.
+ String allocTag = stt.nextElement();
+ int allocNum = Integer.parseInt(stt.nextElement());
+ return new SourceTags(allocTag, allocNum);
+ }
+ }
+
+ /**
+ * Parses a given constraint expression to a {@link AbstractConstraint},
+ * this expression can be any valid form of constraint expressions.
+ *
+ * @param constraintStr expression string
+ * @return a parsed {@link AbstractConstraint}
+ * @throws PlacementConstraintParseException when given expression
+ * is malformed
+ */
+ public static AbstractConstraint parseExpression(String constraintStr)
+ throws PlacementConstraintParseException {
+ // Try parse given expression with all allowed constraint parsers,
+ // fails if no one could parse it.
+ TargetConstraintParser tp = new TargetConstraintParser(constraintStr);
+ Optional<AbstractConstraint> constraintOptional =
+ Optional.ofNullable(tp.tryParse());
+ if (!constraintOptional.isPresent()) {
+ CardinalityConstraintParser cp =
+ new CardinalityConstraintParser(constraintStr);
+ constraintOptional = Optional.ofNullable(cp.tryParse());
+ if (!constraintOptional.isPresent()) {
+ ConjunctionConstraintParser jp =
+ new ConjunctionConstraintParser(constraintStr);
+ constraintOptional = Optional.ofNullable(jp.tryParse());
+ }
+ if (!constraintOptional.isPresent()) {
+ throw new PlacementConstraintParseException(
+ "Invalid constraint expression " + constraintStr);
+ }
+ }
+ return constraintOptional.get();
+ }
+
+ /**
+ * Parses a placement constraint specification. A placement constraint spec
+ * is a composite expression which is composed by multiple sub constraint
+ * expressions delimited by ":". With following syntax:
+ *
+ * <p>Tag1=N1,P1:Tag2=N2,P2:...:TagN=Nn,Pn</p>
+ *
+ * where <b>TagN=Nn</b> is a key value pair to determine the source
+ * allocation tag and the number of allocations, such as:
+ *
+ * <p>foo=3</p>
+ *
+ * and where <b>Pn</b> can be any form of a valid constraint expression,
+ * such as:
+ *
+ * <ul>
+ * <li>in,node,foo,bar</li>
+ * <li>notin,node,foo,bar,1,2</li>
+ * <li>and(notin,node,foo:notin,node,bar)</li>
+ * </ul>
+ * @param expression expression string.
+ * @return a map of source tags to placement constraint mapping.
+ * @throws PlacementConstraintParseException
+ */
+ public static Map<SourceTags, PlacementConstraint> parsePlacementSpec(
+ String expression) throws PlacementConstraintParseException {
+ // Respect insertion order.
+ Map<SourceTags, PlacementConstraint> result = new LinkedHashMap<>();
+ PlacementConstraintParser.ConstraintTokenizer tokenizer =
+ new PlacementConstraintParser.MultipleConstraintsTokenizer(expression);
+ tokenizer.validate();
+ while(tokenizer.hasMoreElements()) {
+ String specStr = tokenizer.nextElement();
+ // each spec starts with sourceAllocationTag=numOfContainers and
+ // followed by a constraint expression.
+ // foo=4,Pn
+ String[] splitted = specStr.split(
+ String.valueOf(EXPRESSION_VAL_DELIM), 2);
+ if (splitted.length != 2) {
+ throw new PlacementConstraintParseException(
+ "Unexpected placement constraint expression " + specStr);
+ }
+
+ String tagAlloc = splitted[0];
+ SourceTags st = SourceTags.parseFrom(tagAlloc);
+ String exprs = splitted[1];
+ AbstractConstraint constraint =
+ PlacementConstraintParser.parseExpression(exprs);
+
+ result.put(st, constraint.build());
+ }
+
+ return result;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08c0488/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/package-info.java
new file mode 100644
index 0000000..890d5ec
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package org.apache.hadoop.yarn.util.constraint contains classes
+ * which is used as utility class for placement constraints.
+ */
+package org.apache.hadoop.yarn.util.constraint;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08c0488/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
new file mode 100644
index 0000000..941f971
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.resource;
+
+import com.google.common.collect.Sets;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTags;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.TargetConstraintParser;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConstraintParser;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.CardinalityConstraintParser;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConjunctionConstraintParser;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.MultipleConstraintsTokenizer;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTagsTokenizer;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConstraintTokenizer;
+
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.*;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Class to test placement constraint parser.
+ */
+public class TestPlacementConstraintParser {
+
+ @Test
+ public void testTargetExpressionParser()
+ throws PlacementConstraintParseException {
+ ConstraintParser parser;
+ AbstractConstraint constraint;
+ SingleConstraint single;
+
+ // Anti-affinity with single target tag
+ // NOTIN,NDOE,foo
+ parser = new TargetConstraintParser("NOTIN, NODE, foo");
+ constraint = parser.parse();
+ Assert.assertTrue(constraint instanceof SingleConstraint);
+ single = (SingleConstraint) constraint;
+ Assert.assertEquals("node", single.getScope());
+ Assert.assertEquals(0, single.getMinCardinality());
+ Assert.assertEquals(0, single.getMaxCardinality());
+
+ // lower cases is also valid
+ parser = new TargetConstraintParser("notin, node, foo");
+ constraint = parser.parse();
+ Assert.assertTrue(constraint instanceof SingleConstraint);
+ single = (SingleConstraint) constraint;
+ Assert.assertEquals("node", single.getScope());
+ Assert.assertEquals(0, single.getMinCardinality());
+ Assert.assertEquals(0, single.getMaxCardinality());
+
+ // Affinity with single target tag
+ // IN,NODE,foo
+ parser = new TargetConstraintParser("IN, NODE, foo");
+ constraint = parser.parse();
+ Assert.assertTrue(constraint instanceof SingleConstraint);
+ single = (SingleConstraint) constraint;
+ Assert.assertEquals("node", single.getScope());
+ Assert.assertEquals(1, single.getMinCardinality());
+ Assert.assertEquals(Integer.MAX_VALUE, single.getMaxCardinality());
+
+ // Anti-affinity with multiple target tags
+ // NOTIN,NDOE,foo,bar,exp
+ parser = new TargetConstraintParser("NOTIN, NODE, foo, bar, exp");
+ constraint = parser.parse();
+ Assert.assertTrue(constraint instanceof SingleConstraint);
+ single = (SingleConstraint) constraint;
+ Assert.assertEquals("node", single.getScope());
+ Assert.assertEquals(0, single.getMinCardinality());
+ Assert.assertEquals(0, single.getMaxCardinality());
+ Assert.assertEquals(1, single.getTargetExpressions().size());
+ TargetExpression exp =
+ single.getTargetExpressions().iterator().next();
+ Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString());
+ Assert.assertEquals(3, exp.getTargetValues().size());
+
+ // Invalid OP
+ parser = new TargetConstraintParser("XYZ, NODE, foo");
+ try {
+ parser.parse();
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof PlacementConstraintParseException);
+ Assert.assertTrue(e.getMessage().contains("expecting in or notin"));
+ }
+ }
+
+ @Test
+ public void testCardinalityConstraintParser()
+ throws PlacementConstraintParseException {
+ ConstraintParser parser;
+ AbstractConstraint constraint;
+ SingleConstraint single;
+
+ // cardinality,NODE,foo,0,1
+ parser = new CardinalityConstraintParser("cardinality, NODE, foo, 0, 1");
+ constraint = parser.parse();
+ Assert.assertTrue(constraint instanceof SingleConstraint);
+ single = (SingleConstraint) constraint;
+ Assert.assertEquals("node", single.getScope());
+ Assert.assertEquals(0, single.getMinCardinality());
+ Assert.assertEquals(1, single.getMaxCardinality());
+ Assert.assertEquals(1, single.getTargetExpressions().size());
+ TargetExpression exp =
+ single.getTargetExpressions().iterator().next();
+ Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString());
+ Assert.assertEquals(1, exp.getTargetValues().size());
+ Assert.assertEquals("foo", exp.getTargetValues().iterator().next());
+
+ // cardinality,NODE,foo,bar,moo,0,1
+ parser = new CardinalityConstraintParser(
+ "cardinality,RACK,foo,bar,moo,0,1");
+ constraint = parser.parse();
+ Assert.assertTrue(constraint instanceof SingleConstraint);
+ single = (SingleConstraint) constraint;
+ Assert.assertEquals("rack", single.getScope());
+ Assert.assertEquals(0, single.getMinCardinality());
+ Assert.assertEquals(1, single.getMaxCardinality());
+ Assert.assertEquals(1, single.getTargetExpressions().size());
+ exp = single.getTargetExpressions().iterator().next();
+ Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString());
+ Assert.assertEquals(3, exp.getTargetValues().size());
+ Set<String> expectedTags = Sets.newHashSet("foo", "bar", "moo");
+ Assert.assertTrue(Sets.difference(expectedTags, exp.getTargetValues())
+ .isEmpty());
+
+ // Invalid scope string
+ try {
+ parser = new CardinalityConstraintParser(
+ "cardinality,NOWHERE,foo,bar,moo,0,1");
+ parser.parse();
+ Assert.fail("Expecting a parsing failure!");
+ } catch (PlacementConstraintParseException e) {
+ Assert.assertTrue(e.getMessage()
+ .contains("expecting scope to node or rack, but met NOWHERE"));
+ }
+
+ // Invalid number of expression elements
+ try {
+ parser = new CardinalityConstraintParser(
+ "cardinality,NODE,0,1");
+ parser.parse();
+ Assert.fail("Expecting a parsing failure!");
+ } catch (PlacementConstraintParseException e) {
+ Assert.assertTrue(e.getMessage()
+ .contains("at least 5 elements, but only 4 is given"));
+ }
+ }
+
+ @Test
+ public void testAndConstraintParser()
+ throws PlacementConstraintParseException {
+ ConstraintParser parser;
+ AbstractConstraint constraint;
+ And and;
+
+ parser = new ConjunctionConstraintParser(
+ "AND(NOTIN,NODE,foo:NOTIN,NODE,bar)");
+ constraint = parser.parse();
+ Assert.assertTrue(constraint instanceof And);
+ and = (And) constraint;
+ Assert.assertEquals(2, and.getChildren().size());
+
+ parser = new ConjunctionConstraintParser(
+ "AND(NOTIN,NODE,foo:cardinality,NODE,foo,0,1)");
+ constraint = parser.parse();
+ Assert.assertTrue(constraint instanceof And);
+ Assert.assertEquals(2, and.getChildren().size());
+
+ parser = new ConjunctionConstraintParser(
+ "AND(NOTIN,NODE,foo:AND(NOTIN,NODE,foo:cardinality,NODE,foo,0,1))");
+ constraint = parser.parse();
+ Assert.assertTrue(constraint instanceof And);
+ and = (And) constraint;
+ Assert.assertTrue(and.getChildren().get(0) instanceof SingleConstraint);
+ Assert.assertTrue(and.getChildren().get(1) instanceof And);
+ and = (And) and.getChildren().get(1);
+ Assert.assertEquals(2, and.getChildren().size());
+ }
+
+ @Test
+ public void testMultipleConstraintsTokenizer()
+ throws PlacementConstraintParseException {
+ MultipleConstraintsTokenizer ct;
+ SourceTagsTokenizer st;
+ TokenizerTester mp;
+
+ ct = new MultipleConstraintsTokenizer(
+ "foo=1,A1,A2,A3:bar=2,B1,B2:moo=3,C1,C2");
+ mp = new TokenizerTester(ct,
+ "foo=1,A1,A2,A3", "bar=2,B1,B2", "moo=3,C1,C2");
+ mp.verify();
+
+ ct = new MultipleConstraintsTokenizer(
+ "foo=1,AND(A2:A3):bar=2,OR(B1:AND(B2:B3)):moo=3,C1,C2");
+ mp = new TokenizerTester(ct,
+ "foo=1,AND(A2:A3)", "bar=2,OR(B1:AND(B2:B3))", "moo=3,C1,C2");
+ mp.verify();
+
+ ct = new MultipleConstraintsTokenizer("A:B:C");
+ mp = new TokenizerTester(ct, "A", "B", "C");
+ mp.verify();
+
+ ct = new MultipleConstraintsTokenizer("A:AND(B:C):D");
+ mp = new TokenizerTester(ct, "A", "AND(B:C)", "D");
+ mp.verify();
+
+ ct = new MultipleConstraintsTokenizer("A:AND(B:OR(C:D)):E");
+ mp = new TokenizerTester(ct, "A", "AND(B:OR(C:D))", "E");
+ mp.verify();
+
+ ct = new MultipleConstraintsTokenizer("A:AND(B:OR(C:D)):E");
+ mp = new TokenizerTester(ct, "A", "AND(B:OR(C:D))", "E");
+ mp.verify();
+
+ st = new SourceTagsTokenizer("A=4");
+ mp = new TokenizerTester(st, "A", "4");
+ mp.verify();
+
+ try {
+ st = new SourceTagsTokenizer("A=B");
+ mp = new TokenizerTester(st, "A", "B");
+ mp.verify();
+ Assert.fail("Expecting a parsing failure");
+ } catch (PlacementConstraintParseException e) {
+ Assert.assertTrue(e.getMessage()
+ .contains("Value of the expression must be an integer"));
+ }
+ }
+
+ private static class TokenizerTester {
+
+ private ConstraintTokenizer tokenizer;
+ private String[] expectedExtractions;
+
+ protected TokenizerTester(ConstraintTokenizer tk,
+ String... expctedStrings) {
+ this.tokenizer = tk;
+ this.expectedExtractions = expctedStrings;
+ }
+
+ void verify()
+ throws PlacementConstraintParseException {
+ tokenizer.validate();
+ int i = 0;
+ while (tokenizer.hasMoreElements()) {
+ String current = tokenizer.nextElement();
+ Assert.assertTrue(i < expectedExtractions.length);
+ Assert.assertEquals(expectedExtractions[i], current);
+ i++;
+ }
+ }
+ }
+
+ @Test
+ public void testParsePlacementSpec()
+ throws PlacementConstraintParseException {
+ Map<SourceTags, PlacementConstraint> result;
+ PlacementConstraint expectedPc1, expectedPc2;
+ PlacementConstraint actualPc1, actualPc2;
+ SourceTags tag1, tag2;
+
+ // A single anti-affinity constraint
+ result = PlacementConstraintParser
+ .parsePlacementSpec("foo=3,notin,node,foo");
+ Assert.assertEquals(1, result.size());
+ tag1 = result.keySet().iterator().next();
+ Assert.assertEquals("foo", tag1.getTag());
+ Assert.assertEquals(3, tag1.getNumOfAllocations());
+ expectedPc1 = targetNotIn("node", allocationTag("foo")).build();
+ actualPc1 = result.values().iterator().next();
+ Assert.assertEquals(expectedPc1, actualPc1);
+
+ // Upper case
+ result = PlacementConstraintParser
+ .parsePlacementSpec("foo=3,NOTIN,NODE,foo");
+ Assert.assertEquals(1, result.size());
+ tag1 = result.keySet().iterator().next();
+ Assert.assertEquals("foo", tag1.getTag());
+ Assert.assertEquals(3, tag1.getNumOfAllocations());
+ expectedPc1 = targetNotIn("node", allocationTag("foo")).build();
+ actualPc1 = result.values().iterator().next();
+ Assert.assertEquals(expectedPc1, actualPc1);
+
+ // A single cardinality constraint
+ result = PlacementConstraintParser
+ .parsePlacementSpec("foo=10,cardinality,node,foo,bar,0,100");
+ Assert.assertEquals(1, result.size());
+ tag1 = result.keySet().iterator().next();
+ Assert.assertEquals("foo", tag1.getTag());
+ Assert.assertEquals(10, tag1.getNumOfAllocations());
+ expectedPc1 = cardinality("node", 0, 100, "foo", "bar").build();
+ Assert.assertEquals(expectedPc1, result.values().iterator().next());
+
+ // Two constraint expressions
+ result = PlacementConstraintParser
+ .parsePlacementSpec("foo=3,notin,node,foo:bar=2,in,node,foo");
+ Assert.assertEquals(2, result.size());
+ Iterator<SourceTags> keyIt = result.keySet().iterator();
+ tag1 = keyIt.next();
+ Assert.assertEquals("foo", tag1.getTag());
+ Assert.assertEquals(3, tag1.getNumOfAllocations());
+ tag2 = keyIt.next();
+ Assert.assertEquals("bar", tag2.getTag());
+ Assert.assertEquals(2, tag2.getNumOfAllocations());
+ Iterator<PlacementConstraint> valueIt = result.values().iterator();
+ expectedPc1 = targetNotIn("node", allocationTag("foo")).build();
+ expectedPc2 = targetIn("node", allocationTag("foo")).build();
+ Assert.assertEquals(expectedPc1, valueIt.next());
+ Assert.assertEquals(expectedPc2, valueIt.next());
+
+ // And constraint
+ result = PlacementConstraintParser
+ .parsePlacementSpec("foo=1000,and(notin,node,bar:in,node,foo)");
+ Assert.assertEquals(1, result.size());
+ keyIt = result.keySet().iterator();
+ tag1 = keyIt.next();
+ Assert.assertEquals("foo", tag1.getTag());
+ Assert.assertEquals(1000, tag1.getNumOfAllocations());
+ actualPc1 = result.values().iterator().next();
+ expectedPc1 = and(targetNotIn("node", allocationTag("bar")),
+ targetIn("node", allocationTag("foo"))).build();
+ Assert.assertEquals(expectedPc1, actualPc1);
+
+ // Multiple constraints with nested forms.
+ result = PlacementConstraintParser.parsePlacementSpec(
+ "foo=1000,and(notin,node,bar:or(in,node,foo:in,node,moo))"
+ + ":bar=200,notin,node,foo");
+ Assert.assertEquals(2, result.size());
+ keyIt = result.keySet().iterator();
+ tag1 = keyIt.next();
+ tag2 = keyIt.next();
+ Assert.assertEquals("foo", tag1.getTag());
+ Assert.assertEquals(1000, tag1.getNumOfAllocations());
+ Assert.assertEquals("bar", tag2.getTag());
+ Assert.assertEquals(200, tag2.getNumOfAllocations());
+ valueIt = result.values().iterator();
+ actualPc1 = valueIt.next();
+ actualPc2 = valueIt.next();
+
+ expectedPc1 = and(targetNotIn("node", allocationTag("bar")),
+ or(targetIn("node", allocationTag("foo")),
+ targetIn("node", allocationTag("moo")))).build();
+ Assert.assertEquals(actualPc1, expectedPc1);
+ expectedPc2 = targetNotIn("node", allocationTag("foo")).build();
+ Assert.assertEquals(expectedPc2, actualPc2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08c0488/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 9ba2138..a06ee7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -28,6 +28,7 @@ import java.lang.reflect.UndeclaredThrowableException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
@@ -38,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
+import java.util.Base64;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -671,8 +673,14 @@ public class ApplicationMaster {
}
private void parsePlacementSpecs(String placementSpecifications) {
+ // Client sends placement spec in encoded format
+ Base64.Decoder decoder = Base64.getDecoder();
+ byte[] decodedBytes = decoder.decode(
+ placementSpecifications.getBytes(StandardCharsets.UTF_8));
+ String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8);
+ LOG.info("Decode placement spec: " + decodedSpec);
Map<String, PlacementSpec> pSpecs =
- PlacementSpec.parse(placementSpecifications);
+ PlacementSpec.parse(decodedSpec);
this.placementSpecs = new HashMap<>();
this.numTotalContainers = 0;
for (PlacementSpec pSpec : pSpecs.values()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08c0488/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 0aef83f..ac58662 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -28,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.Arrays;
+import java.util.Base64;
import com.google.common.base.Joiner;
import org.apache.commons.cli.CommandLine;
@@ -857,7 +859,11 @@ public class Client {
}
vargs.add("--num_containers " + String.valueOf(numContainers));
if (placementSpec != null && placementSpec.length() > 0) {
- vargs.add("--placement_spec " + placementSpec);
+ // Encode the spec to avoid passing special chars via shell arguments.
+ String encodedSpec = Base64.getEncoder()
+ .encodeToString(placementSpec.getBytes(StandardCharsets.UTF_8));
+ LOG.info("Encode placement spec: " + encodedSpec);
+ vargs.add("--placement_spec " + encodedSpec);
}
if (null != nodeLabelExpression) {
appContext.setNodeLabelExpression(nodeLabelExpression);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08c0488/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
index ed13ee0..2909259 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
@@ -18,13 +18,14 @@
package org.apache.hadoop.yarn.applications.distributedshell;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTags;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
-import java.util.Scanner;
/**
* Class encapsulating a SourceTag, number of container and a Placement
@@ -34,12 +35,6 @@ public class PlacementSpec {
private static final Logger LOG =
LoggerFactory.getLogger(PlacementSpec.class);
- private static final String SPEC_DELIM = ":";
- private static final String KV_SPLIT_DELIM = "=";
- private static final String SPEC_VAL_DELIM = ",";
- private static final String IN = "in";
- private static final String NOT_IN = "notin";
- private static final String CARDINALITY = "cardinality";
public final String sourceTag;
public final int numContainers;
@@ -73,65 +68,28 @@ public class PlacementSpec {
* @param specs Placement spec.
* @return Mapping from source tag to placement constraint.
*/
- public static Map<String, PlacementSpec> parse(String specs) {
+ public static Map<String, PlacementSpec> parse(String specs)
+ throws IllegalArgumentException {
LOG.info("Parsing Placement Specs: [{}]", specs);
- Scanner s = new Scanner(specs).useDelimiter(SPEC_DELIM);
Map<String, PlacementSpec> pSpecs = new HashMap<>();
- while (s.hasNext()) {
- String sp = s.next();
- LOG.info("Parsing Spec: [{}]", sp);
- String[] specSplit = sp.split(KV_SPLIT_DELIM);
- String sourceTag = specSplit[0];
- Scanner ps = new Scanner(specSplit[1]).useDelimiter(SPEC_VAL_DELIM);
- int numContainers = ps.nextInt();
- if (!ps.hasNext()) {
- pSpecs.put(sourceTag,
- new PlacementSpec(sourceTag, numContainers, null));
- LOG.info("Creating Spec without constraint {}: num[{}]",
- sourceTag, numContainers);
- continue;
+ Map<SourceTags, PlacementConstraint> parsed;
+ try {
+ parsed = PlacementConstraintParser.parsePlacementSpec(specs);
+ for (Map.Entry<SourceTags, PlacementConstraint> entry :
+ parsed.entrySet()) {
+ LOG.info("Parsed source tag: {}, number of allocations: {}",
+ entry.getKey().getTag(), entry.getKey().getNumOfAllocations());
+ LOG.info("Parsed constraint: {}", entry.getValue()
+ .getConstraintExpr().getClass().getSimpleName());
+ pSpecs.put(entry.getKey().getTag(), new PlacementSpec(
+ entry.getKey().getTag(),
+ entry.getKey().getNumOfAllocations(),
+ entry.getValue()));
}
- String cType = ps.next().toLowerCase();
- String scope = ps.next().toLowerCase();
-
- String targetTag = ps.next();
- scope = scope.equals("rack") ? PlacementConstraints.RACK :
- PlacementConstraints.NODE;
-
- PlacementConstraint pc;
- if (cType.equals(IN)) {
- pc = PlacementConstraints.build(
- PlacementConstraints.targetIn(scope,
- PlacementConstraints.PlacementTargets.allocationTag(
- targetTag)));
- LOG.info("Creating IN Constraint for source tag [{}], num[{}]: " +
- "scope[{}], target[{}]",
- sourceTag, numContainers, scope, targetTag);
- } else if (cType.equals(NOT_IN)) {
- pc = PlacementConstraints.build(
- PlacementConstraints.targetNotIn(scope,
- PlacementConstraints.PlacementTargets.allocationTag(
- targetTag)));
- LOG.info("Creating NOT_IN Constraint for source tag [{}], num[{}]: " +
- "scope[{}], target[{}]",
- sourceTag, numContainers, scope, targetTag);
- } else if (cType.equals(CARDINALITY)) {
- int minCard = ps.nextInt();
- int maxCard = ps.nextInt();
- pc = PlacementConstraints.build(
- PlacementConstraints.targetCardinality(scope, minCard, maxCard,
- PlacementConstraints.PlacementTargets.allocationTag(
- targetTag)));
- LOG.info("Creating CARDINALITY Constraint source tag [{}], num[{}]: " +
- "scope[{}], min[{}], max[{}], target[{}]",
- sourceTag, numContainers, scope, minCard, maxCard, targetTag);
- } else {
- throw new RuntimeException(
- "Could not parse constraintType [" + cType + "]" +
- " in [" + specSplit[1] + "]");
- }
- pSpecs.put(sourceTag, new PlacementSpec(sourceTag, numContainers, pc));
+ return pSpecs;
+ } catch (PlacementConstraintParseException e) {
+ throw new IllegalArgumentException(
+ "Invalid placement spec: " + specs, e);
}
- return pSpecs;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org