You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/31 15:55:59 UTC
[29/37] hadoop git commit: YARN-7745. Allow DistributedShell to take
a placement specification for containers it wants to launch. (Arun Suresh via
wangda)
YARN-7745. Allow DistributedShell to take a placement specification for containers it wants to launch. (Arun Suresh via wangda)
Change-Id: Ided146d662e944a8a4692e5d6885f23fd9bbcad5
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e60f5129
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e60f5129
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e60f5129
Branch: refs/heads/YARN-6592
Commit: e60f51299dba360d13aa39f9ab714fdfc666b532
Parents: 38af237
Author: Wangda Tan <wa...@apache.org>
Authored: Thu Jan 18 14:22:45 2018 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jan 31 01:30:17 2018 -0800
----------------------------------------------------------------------
.../distributedshell/ApplicationMaster.java | 124 +++++++++++++++--
.../applications/distributedshell/Client.java | 14 ++
.../distributedshell/PlacementSpec.java | 137 +++++++++++++++++++
3 files changed, 263 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e60f5129/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 270ef1b..9ba2138 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
@@ -87,8 +88,11 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProfileCapability;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.ExecutionType;
@@ -99,6 +103,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
@@ -274,6 +279,10 @@ public class ApplicationMaster {
@VisibleForTesting
protected AtomicInteger numRequestedContainers = new AtomicInteger();
+ protected AtomicInteger numIgnore = new AtomicInteger();
+
+ protected AtomicInteger totalRetries = new AtomicInteger(10);
+
// Shell command to be executed
private String shellCommand = "";
// Args to be passed to the shell command
@@ -289,6 +298,9 @@ public class ApplicationMaster {
// File length needed for local resource
private long shellScriptPathLen = 0;
+ // Placement Specifications
+ private Map<String, PlacementSpec> placementSpecs = null;
+
// Container retry options
private ContainerRetryPolicy containerRetryPolicy =
ContainerRetryPolicy.NEVER_RETRY;
@@ -334,6 +346,7 @@ public class ApplicationMaster {
private final String windows_command = "cmd /c";
private int yarnShellIdCounter = 1;
+ private final AtomicLong allocIdCounter = new AtomicLong(1);
@VisibleForTesting
protected final Set<ContainerId> launchedContainers =
@@ -457,6 +470,7 @@ public class ApplicationMaster {
"If container could retry, it specifies max retires");
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
+ opts.addOption("placement_spec", true, "Placement specification");
opts.addOption("debug", false, "Dump out debug information");
opts.addOption("help", false, "Print usage");
@@ -487,6 +501,17 @@ public class ApplicationMaster {
dumpOutDebugInfo();
}
+ if (cliParser.hasOption("placement_spec")) {
+ String placementSpec = cliParser.getOptionValue("placement_spec");
+ LOG.info("Placement Spec received [{}]", placementSpec);
+ parsePlacementSpecs(placementSpec);
+ LOG.info("Total num containers requested [{}]", numTotalContainers);
+ if (numTotalContainers == 0) {
+ throw new IllegalArgumentException(
+ "Cannot run distributed shell with no containers");
+ }
+ }
+
Map<String, String> envs = System.getenv();
if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
@@ -609,8 +634,11 @@ public class ApplicationMaster {
}
containerResourceProfile =
cliParser.getOptionValue("container_resource_profile", "");
- numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
- "num_containers", "1"));
+
+ if (this.placementSpecs == null) {
+ numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
+ "num_containers", "1"));
+ }
if (numTotalContainers == 0) {
throw new IllegalArgumentException(
"Cannot run distributed shell with no containers");
@@ -642,6 +670,17 @@ public class ApplicationMaster {
return true;
}
+ private void parsePlacementSpecs(String placementSpecifications) {
+ Map<String, PlacementSpec> pSpecs =
+ PlacementSpec.parse(placementSpecifications);
+ this.placementSpecs = new HashMap<>();
+ this.numTotalContainers = 0;
+ for (PlacementSpec pSpec : pSpecs.values()) {
+ this.numTotalContainers += pSpec.numContainers;
+ this.placementSpecs.put(pSpec.sourceTag, pSpec);
+ }
+ }
+
/**
* Helper function to print usage
*
@@ -719,9 +758,19 @@ public class ApplicationMaster {
// Register self with ResourceManager
// This will start heartbeating to the RM
appMasterHostname = NetUtils.getHostname();
+ Map<Set<String>, PlacementConstraint> placementConstraintMap = null;
+ if (this.placementSpecs != null) {
+ placementConstraintMap = new HashMap<>();
+ for (PlacementSpec spec : this.placementSpecs.values()) {
+ if (spec.constraint != null) {
+ placementConstraintMap.put(
+ Collections.singleton(spec.sourceTag), spec.constraint);
+ }
+ }
+ }
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
- appMasterTrackingUrl);
+ appMasterTrackingUrl, placementConstraintMap);
resourceProfiles = response.getResourceProfiles();
ResourceUtils.reinitializeResources(response.getResourceTypes());
// Dump out information about cluster capability as seen by the
@@ -765,9 +814,20 @@ public class ApplicationMaster {
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
- for (int i = 0; i < numTotalContainersToRequest; ++i) {
- ContainerRequest containerAsk = setupContainerAskForRM();
- amRMClient.addContainerRequest(containerAsk);
+ if (this.placementSpecs == null) {
+ for (int i = 0; i < numTotalContainersToRequest; ++i) {
+ ContainerRequest containerAsk = setupContainerAskForRM();
+ amRMClient.addContainerRequest(containerAsk);
+ }
+ } else {
+ List<SchedulingRequest> schedReqs = new ArrayList<>();
+ for (PlacementSpec pSpec : this.placementSpecs.values()) {
+ for (int i = 0; i < pSpec.numContainers; i++) {
+ SchedulingRequest sr = setupSchedulingRequest(pSpec);
+ schedReqs.add(sr);
+ }
+ }
+ amRMClient.addSchedulingRequests(schedReqs);
}
numRequestedContainers.set(numTotalContainers);
}
@@ -933,6 +993,12 @@ public class ApplicationMaster {
numRequestedContainers.decrementAndGet();
// we do not need to release the container as it would be done
// by the RM
+
+ // Ignore these containers if placementspec is enabled
+ // for the time being.
+ if (placementSpecs != null) {
+ numIgnore.incrementAndGet();
+ }
}
} else {
// nothing to do
@@ -962,14 +1028,18 @@ public class ApplicationMaster {
int askCount = numTotalContainers - numRequestedContainers.get();
numRequestedContainers.addAndGet(askCount);
- if (askCount > 0) {
- for (int i = 0; i < askCount; ++i) {
- ContainerRequest containerAsk = setupContainerAskForRM();
- amRMClient.addContainerRequest(containerAsk);
+ // Dont bother re-asking if we are using placementSpecs
+ if (placementSpecs == null) {
+ if (askCount > 0) {
+ for (int i = 0; i < askCount; ++i) {
+ ContainerRequest containerAsk = setupContainerAskForRM();
+ amRMClient.addContainerRequest(containerAsk);
+ }
}
}
-
- if (numCompletedContainers.get() == numTotalContainers) {
+
+ if (numCompletedContainers.get() + numIgnore.get() >=
+ numTotalContainers) {
done = true;
}
}
@@ -1029,6 +1099,23 @@ public class ApplicationMaster {
}
@Override
+ public void onRequestsRejected(List<RejectedSchedulingRequest> rejReqs) {
+ List<SchedulingRequest> reqsToRetry = new ArrayList<>();
+ for (RejectedSchedulingRequest rejReq : rejReqs) {
+ LOG.info("Scheduling Request {} has been rejected. Reason {}",
+ rejReq.getRequest(), rejReq.getReason());
+ reqsToRetry.add(rejReq.getRequest());
+ }
+ totalRetries.addAndGet(-1 * reqsToRetry.size());
+ if (totalRetries.get() <= 0) {
+ LOG.info("Exiting, since retries are exhausted !!");
+ done = true;
+ } else {
+ amRMClient.addSchedulingRequests(reqsToRetry);
+ }
+ }
+
+ @Override
public void onShutdownRequest() {
done = true;
}
@@ -1335,6 +1422,19 @@ public class ApplicationMaster {
return request;
}
+ private SchedulingRequest setupSchedulingRequest(PlacementSpec spec) {
+ long allocId = allocIdCounter.incrementAndGet();
+ SchedulingRequest sReq = SchedulingRequest.newInstance(
+ allocId, Priority.newInstance(requestPriority),
+ ExecutionTypeRequest.newInstance(),
+ Collections.singleton(spec.sourceTag),
+ ResourceSizing.newInstance(
+ createProfileCapability().getProfileCapabilityOverride()), null);
+ sReq.setPlacementConstraint(spec.constraint);
+ LOG.info("Scheduling Request made: " + sReq.toString());
+ return sReq;
+ }
+
private boolean fileExist(String filePath) {
return new File(filePath).exists();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e60f5129/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index ef635d3..2aafa94 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -188,6 +188,8 @@ public class Client {
// Whether to auto promote opportunistic containers
private boolean autoPromoteContainers = false;
+ // Placement specification
+ private String placementSpec = "";
// log4j.properties file
// if available, add to local resources and set into classpath
private String log4jPropFile = "";
@@ -366,6 +368,10 @@ public class Client {
"If container could retry, it specifies max retires");
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
+ opts.addOption("placement_spec", true,
+ "Placement specification. Please note, if this option is specified,"
+ + " The \"num_containers\" option will be ignored. All requested"
+ + " containers will be of type GUARANTEED" );
}
/**
@@ -419,6 +425,11 @@ public class Client {
keepContainers = true;
}
+ if (cliParser.hasOption("placement_spec")) {
+ placementSpec = cliParser.getOptionValue("placement_spec");
+ // Check if it is parsable
+ PlacementSpec.parse(this.placementSpec);
+ }
appName = cliParser.getOptionValue("appname", "DistributedShell");
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
amQueue = cliParser.getOptionValue("queue", "default");
@@ -834,6 +845,9 @@ public class Client {
vargs.add("--container_resource_profile " + containerResourceProfile);
}
vargs.add("--num_containers " + String.valueOf(numContainers));
+ if (placementSpec != null && placementSpec.length() > 0) {
+ vargs.add("--placement_spec " + placementSpec);
+ }
if (null != nodeLabelExpression) {
appContext.setNodeLabelExpression(nodeLabelExpression);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e60f5129/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
new file mode 100644
index 0000000..ed13ee0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Scanner;
+
+/**
+ * Class encapsulating a SourceTag, number of container and a Placement
+ * Constraint.
+ */
+public class PlacementSpec {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PlacementSpec.class);
+ private static final String SPEC_DELIM = ":";
+ private static final String KV_SPLIT_DELIM = "=";
+ private static final String SPEC_VAL_DELIM = ",";
+ private static final String IN = "in";
+ private static final String NOT_IN = "notin";
+ private static final String CARDINALITY = "cardinality";
+
+ public final String sourceTag;
+ public final int numContainers;
+ public final PlacementConstraint constraint;
+
+ public PlacementSpec(String sourceTag, int numContainers,
+ PlacementConstraint constraint) {
+ this.sourceTag = sourceTag;
+ this.numContainers = numContainers;
+ this.constraint = constraint;
+ }
+
+ // Placement specification should be of the form:
+ // PlacementSpec => ""|KeyVal;PlacementSpec
+ // KeyVal => SourceTag=Constraint
+ // SourceTag => String
+ // Constraint => NumContainers|
+ // NumContainers,"in",Scope,TargetTag|
+ // NumContainers,"notin",Scope,TargetTag|
+ // NumContainers,"cardinality",Scope,TargetTag,MinCard,MaxCard
+ // NumContainers => int (number of containers)
+ // Scope => "NODE"|"RACK"
+ // TargetTag => String (Target Tag)
+ // MinCard => int (min cardinality - needed if ConstraintType == cardinality)
+ // MaxCard => int (max cardinality - needed if ConstraintType == cardinality)
+
+ /**
+ * Parser to convert a string representation of a placement spec to mapping
+ * from source tag to Placement Constraint.
+ *
+ * @param specs Placement spec.
+ * @return Mapping from source tag to placement constraint.
+ */
+ public static Map<String, PlacementSpec> parse(String specs) {
+ LOG.info("Parsing Placement Specs: [{}]", specs);
+ Scanner s = new Scanner(specs).useDelimiter(SPEC_DELIM);
+ Map<String, PlacementSpec> pSpecs = new HashMap<>();
+ while (s.hasNext()) {
+ String sp = s.next();
+ LOG.info("Parsing Spec: [{}]", sp);
+ String[] specSplit = sp.split(KV_SPLIT_DELIM);
+ String sourceTag = specSplit[0];
+ Scanner ps = new Scanner(specSplit[1]).useDelimiter(SPEC_VAL_DELIM);
+ int numContainers = ps.nextInt();
+ if (!ps.hasNext()) {
+ pSpecs.put(sourceTag,
+ new PlacementSpec(sourceTag, numContainers, null));
+ LOG.info("Creating Spec without constraint {}: num[{}]",
+ sourceTag, numContainers);
+ continue;
+ }
+ String cType = ps.next().toLowerCase();
+ String scope = ps.next().toLowerCase();
+
+ String targetTag = ps.next();
+ scope = scope.equals("rack") ? PlacementConstraints.RACK :
+ PlacementConstraints.NODE;
+
+ PlacementConstraint pc;
+ if (cType.equals(IN)) {
+ pc = PlacementConstraints.build(
+ PlacementConstraints.targetIn(scope,
+ PlacementConstraints.PlacementTargets.allocationTag(
+ targetTag)));
+ LOG.info("Creating IN Constraint for source tag [{}], num[{}]: " +
+ "scope[{}], target[{}]",
+ sourceTag, numContainers, scope, targetTag);
+ } else if (cType.equals(NOT_IN)) {
+ pc = PlacementConstraints.build(
+ PlacementConstraints.targetNotIn(scope,
+ PlacementConstraints.PlacementTargets.allocationTag(
+ targetTag)));
+ LOG.info("Creating NOT_IN Constraint for source tag [{}], num[{}]: " +
+ "scope[{}], target[{}]",
+ sourceTag, numContainers, scope, targetTag);
+ } else if (cType.equals(CARDINALITY)) {
+ int minCard = ps.nextInt();
+ int maxCard = ps.nextInt();
+ pc = PlacementConstraints.build(
+ PlacementConstraints.targetCardinality(scope, minCard, maxCard,
+ PlacementConstraints.PlacementTargets.allocationTag(
+ targetTag)));
+ LOG.info("Creating CARDINALITY Constraint source tag [{}], num[{}]: " +
+ "scope[{}], min[{}], max[{}], target[{}]",
+ sourceTag, numContainers, scope, minCard, maxCard, targetTag);
+ } else {
+ throw new RuntimeException(
+ "Could not parse constraintType [" + cType + "]" +
+ " in [" + specSplit[1] + "]");
+ }
+ pSpecs.put(sourceTag, new PlacementSpec(sourceTag, numContainers, pc));
+ }
+ return pSpecs;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org