You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/31 15:55:59 UTC

[29/37] hadoop git commit: YARN-7745. Allow DistributedShell to take a placement specification for containers it wants to launch. (Arun Suresh via wangda)

YARN-7745. Allow DistributedShell to take a placement specification for containers it wants to launch. (Arun Suresh via wangda)

Change-Id: Ided146d662e944a8a4692e5d6885f23fd9bbcad5


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e60f5129
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e60f5129
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e60f5129

Branch: refs/heads/YARN-6592
Commit: e60f51299dba360d13aa39f9ab714fdfc666b532
Parents: 38af237
Author: Wangda Tan <wa...@apache.org>
Authored: Thu Jan 18 14:22:45 2018 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jan 31 01:30:17 2018 -0800

----------------------------------------------------------------------
 .../distributedshell/ApplicationMaster.java     | 124 +++++++++++++++--
 .../applications/distributedshell/Client.java   |  14 ++
 .../distributedshell/PlacementSpec.java         | 137 +++++++++++++++++++
 3 files changed, 263 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e60f5129/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 270ef1b..9ba2138 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
@@ -87,8 +88,11 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ProfileCapability;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
@@ -99,6 +103,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.client.api.TimelineV2Client;
@@ -274,6 +279,10 @@ public class ApplicationMaster {
   @VisibleForTesting
   protected AtomicInteger numRequestedContainers = new AtomicInteger();
 
+  protected AtomicInteger numIgnore = new AtomicInteger();
+
+  protected AtomicInteger totalRetries = new AtomicInteger(10);
+
   // Shell command to be executed
   private String shellCommand = "";
   // Args to be passed to the shell command
@@ -289,6 +298,9 @@ public class ApplicationMaster {
   // File length needed for local resource
   private long shellScriptPathLen = 0;
 
+  // Placement Specifications
+  private Map<String, PlacementSpec> placementSpecs = null;
+
   // Container retry options
   private ContainerRetryPolicy containerRetryPolicy =
       ContainerRetryPolicy.NEVER_RETRY;
@@ -334,6 +346,7 @@ public class ApplicationMaster {
   private final String windows_command = "cmd /c";
 
   private int yarnShellIdCounter = 1;
+  private final AtomicLong allocIdCounter = new AtomicLong(1);
 
   @VisibleForTesting
   protected final Set<ContainerId> launchedContainers =
@@ -457,6 +470,7 @@ public class ApplicationMaster {
         "If container could retry, it specifies max retires");
     opts.addOption("container_retry_interval", true,
         "Interval between each retry, unit is milliseconds");
+    opts.addOption("placement_spec", true, "Placement specification");
     opts.addOption("debug", false, "Dump out debug information");
 
     opts.addOption("help", false, "Print usage");
@@ -487,6 +501,17 @@ public class ApplicationMaster {
       dumpOutDebugInfo();
     }
 
+    if (cliParser.hasOption("placement_spec")) {
+      String placementSpec = cliParser.getOptionValue("placement_spec");
+      LOG.info("Placement Spec received [{}]", placementSpec);
+      parsePlacementSpecs(placementSpec);
+      LOG.info("Total num containers requested [{}]", numTotalContainers);
+      if (numTotalContainers == 0) {
+        throw new IllegalArgumentException(
+            "Cannot run distributed shell with no containers");
+      }
+    }
+
     Map<String, String> envs = System.getenv();
 
     if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
@@ -609,8 +634,11 @@ public class ApplicationMaster {
     }
     containerResourceProfile =
         cliParser.getOptionValue("container_resource_profile", "");
-    numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
-        "num_containers", "1"));
+
+    if (this.placementSpecs == null) {
+      numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
+          "num_containers", "1"));
+    }
     if (numTotalContainers == 0) {
       throw new IllegalArgumentException(
           "Cannot run distributed shell with no containers");
@@ -642,6 +670,17 @@ public class ApplicationMaster {
     return true;
   }
 
+  private void parsePlacementSpecs(String placementSpecifications) {
+    Map<String, PlacementSpec> pSpecs =
+        PlacementSpec.parse(placementSpecifications);
+    this.placementSpecs = new HashMap<>();
+    this.numTotalContainers = 0;
+    for (PlacementSpec pSpec : pSpecs.values()) {
+      this.numTotalContainers += pSpec.numContainers;
+      this.placementSpecs.put(pSpec.sourceTag, pSpec);
+    }
+  }
+
   /**
    * Helper function to print usage
    *
@@ -719,9 +758,19 @@ public class ApplicationMaster {
     // Register self with ResourceManager
     // This will start heartbeating to the RM
     appMasterHostname = NetUtils.getHostname();
+    Map<Set<String>, PlacementConstraint> placementConstraintMap = null;
+    if (this.placementSpecs != null) {
+      placementConstraintMap = new HashMap<>();
+      for (PlacementSpec spec : this.placementSpecs.values()) {
+        if (spec.constraint != null) {
+          placementConstraintMap.put(
+              Collections.singleton(spec.sourceTag), spec.constraint);
+        }
+      }
+    }
     RegisterApplicationMasterResponse response = amRMClient
         .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
-            appMasterTrackingUrl);
+            appMasterTrackingUrl, placementConstraintMap);
     resourceProfiles = response.getResourceProfiles();
     ResourceUtils.reinitializeResources(response.getResourceTypes());
     // Dump out information about cluster capability as seen by the
@@ -765,9 +814,20 @@ public class ApplicationMaster {
     // containers
     // Keep looping until all the containers are launched and shell script
     // executed on them ( regardless of success/failure).
-    for (int i = 0; i < numTotalContainersToRequest; ++i) {
-      ContainerRequest containerAsk = setupContainerAskForRM();
-      amRMClient.addContainerRequest(containerAsk);
+    if (this.placementSpecs == null) {
+      for (int i = 0; i < numTotalContainersToRequest; ++i) {
+        ContainerRequest containerAsk = setupContainerAskForRM();
+        amRMClient.addContainerRequest(containerAsk);
+      }
+    } else {
+      List<SchedulingRequest> schedReqs = new ArrayList<>();
+      for (PlacementSpec pSpec : this.placementSpecs.values()) {
+        for (int i = 0; i < pSpec.numContainers; i++) {
+          SchedulingRequest sr = setupSchedulingRequest(pSpec);
+          schedReqs.add(sr);
+        }
+      }
+      amRMClient.addSchedulingRequests(schedReqs);
     }
     numRequestedContainers.set(numTotalContainers);
   }
@@ -933,6 +993,12 @@ public class ApplicationMaster {
             numRequestedContainers.decrementAndGet();
             // we do not need to release the container as it would be done
             // by the RM
+
+            // Ignore these containers if placementspec is enabled
+            // for the time being.
+            if (placementSpecs != null) {
+              numIgnore.incrementAndGet();
+            }
           }
         } else {
           // nothing to do
@@ -962,14 +1028,18 @@ public class ApplicationMaster {
       int askCount = numTotalContainers - numRequestedContainers.get();
       numRequestedContainers.addAndGet(askCount);
 
-      if (askCount > 0) {
-        for (int i = 0; i < askCount; ++i) {
-          ContainerRequest containerAsk = setupContainerAskForRM();
-          amRMClient.addContainerRequest(containerAsk);
+      // Dont bother re-asking if we are using placementSpecs
+      if (placementSpecs == null) {
+        if (askCount > 0) {
+          for (int i = 0; i < askCount; ++i) {
+            ContainerRequest containerAsk = setupContainerAskForRM();
+            amRMClient.addContainerRequest(containerAsk);
+          }
         }
       }
-      
-      if (numCompletedContainers.get() == numTotalContainers) {
+
+      if (numCompletedContainers.get() + numIgnore.get() >=
+          numTotalContainers) {
         done = true;
       }
     }
@@ -1029,6 +1099,23 @@ public class ApplicationMaster {
     }
 
     @Override
+    public void onRequestsRejected(List<RejectedSchedulingRequest> rejReqs) {
+      List<SchedulingRequest> reqsToRetry = new ArrayList<>();
+      for (RejectedSchedulingRequest rejReq : rejReqs) {
+        LOG.info("Scheduling Request {} has been rejected. Reason {}",
+            rejReq.getRequest(), rejReq.getReason());
+        reqsToRetry.add(rejReq.getRequest());
+      }
+      totalRetries.addAndGet(-1 * reqsToRetry.size());
+      if (totalRetries.get() <= 0) {
+        LOG.info("Exiting, since retries are exhausted !!");
+        done = true;
+      } else {
+        amRMClient.addSchedulingRequests(reqsToRetry);
+      }
+    }
+
+    @Override
     public void onShutdownRequest() {
       done = true;
     }
@@ -1335,6 +1422,19 @@ public class ApplicationMaster {
     return request;
   }
 
+  private SchedulingRequest setupSchedulingRequest(PlacementSpec spec) {
+    long allocId = allocIdCounter.incrementAndGet();
+    SchedulingRequest sReq = SchedulingRequest.newInstance(
+        allocId, Priority.newInstance(requestPriority),
+        ExecutionTypeRequest.newInstance(),
+        Collections.singleton(spec.sourceTag),
+        ResourceSizing.newInstance(
+            createProfileCapability().getProfileCapabilityOverride()), null);
+    sReq.setPlacementConstraint(spec.constraint);
+    LOG.info("Scheduling Request made: " + sReq.toString());
+    return sReq;
+  }
+
   private boolean fileExist(String filePath) {
     return new File(filePath).exists();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e60f5129/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index ef635d3..2aafa94 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -188,6 +188,8 @@ public class Client {
   // Whether to auto promote opportunistic containers
   private boolean autoPromoteContainers = false;
 
+  // Placement specification
+  private String placementSpec = "";
   // log4j.properties file 
   // if available, add to local resources and set into classpath 
   private String log4jPropFile = "";	
@@ -366,6 +368,10 @@ public class Client {
         "If container could retry, it specifies max retires");
     opts.addOption("container_retry_interval", true,
         "Interval between each retry, unit is milliseconds");
+    opts.addOption("placement_spec", true,
+        "Placement specification. Please note, if this option is specified,"
+            + " The \"num_containers\" option will be ignored. All requested"
+            + " containers will be of type GUARANTEED" );
   }
 
   /**
@@ -419,6 +425,11 @@ public class Client {
       keepContainers = true;
     }
 
+    if (cliParser.hasOption("placement_spec")) {
+      placementSpec = cliParser.getOptionValue("placement_spec");
+      // Check if it is parsable
+      PlacementSpec.parse(this.placementSpec);
+    }
     appName = cliParser.getOptionValue("appname", "DistributedShell");
     amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
     amQueue = cliParser.getOptionValue("queue", "default");
@@ -834,6 +845,9 @@ public class Client {
       vargs.add("--container_resource_profile " + containerResourceProfile);
     }
     vargs.add("--num_containers " + String.valueOf(numContainers));
+    if (placementSpec != null && placementSpec.length() > 0) {
+      vargs.add("--placement_spec " + placementSpec);
+    }
     if (null != nodeLabelExpression) {
       appContext.setNodeLabelExpression(nodeLabelExpression);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e60f5129/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
new file mode 100644
index 0000000..ed13ee0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Scanner;
+
+/**
+ * Class encapsulating a SourceTag, number of container and a Placement
+ * Constraint.
+ */
+public class PlacementSpec {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PlacementSpec.class);
+  private static final String SPEC_DELIM = ":";
+  private static final String KV_SPLIT_DELIM = "=";
+  private static final String SPEC_VAL_DELIM = ",";
+  private static final String IN = "in";
+  private static final String NOT_IN = "notin";
+  private static final String CARDINALITY = "cardinality";
+
+  public final String sourceTag;
+  public final int numContainers;
+  public final PlacementConstraint constraint;
+
+  public PlacementSpec(String sourceTag, int numContainers,
+      PlacementConstraint constraint) {
+    this.sourceTag = sourceTag;
+    this.numContainers = numContainers;
+    this.constraint = constraint;
+  }
+
+  // Placement specification should be of the form:
+  // PlacementSpec => ""|KeyVal;PlacementSpec
+  // KeyVal => SourceTag=Constraint
+  // SourceTag => String
+  // Constraint => NumContainers|
+  //               NumContainers,"in",Scope,TargetTag|
+  //               NumContainers,"notin",Scope,TargetTag|
+  //               NumContainers,"cardinality",Scope,TargetTag,MinCard,MaxCard
+  // NumContainers => int (number of containers)
+  // Scope => "NODE"|"RACK"
+  // TargetTag => String (Target Tag)
+  // MinCard => int (min cardinality - needed if ConstraintType == cardinality)
+  // MaxCard => int (max cardinality - needed if ConstraintType == cardinality)
+
+  /**
+   * Parser to convert a string representation of a placement spec to mapping
+   * from source tag to Placement Constraint.
+   *
+   * @param specs Placement spec.
+   * @return Mapping from source tag to placement constraint.
+   */
+  public static Map<String, PlacementSpec> parse(String specs) {
+    LOG.info("Parsing Placement Specs: [{}]", specs);
+    Scanner s = new Scanner(specs).useDelimiter(SPEC_DELIM);
+    Map<String, PlacementSpec> pSpecs = new HashMap<>();
+    while (s.hasNext()) {
+      String sp = s.next();
+      LOG.info("Parsing Spec: [{}]", sp);
+      String[] specSplit = sp.split(KV_SPLIT_DELIM);
+      String sourceTag = specSplit[0];
+      Scanner ps = new Scanner(specSplit[1]).useDelimiter(SPEC_VAL_DELIM);
+      int numContainers = ps.nextInt();
+      if (!ps.hasNext()) {
+        pSpecs.put(sourceTag,
+            new PlacementSpec(sourceTag, numContainers, null));
+        LOG.info("Creating Spec without constraint {}: num[{}]",
+            sourceTag, numContainers);
+        continue;
+      }
+      String cType = ps.next().toLowerCase();
+      String scope = ps.next().toLowerCase();
+
+      String targetTag = ps.next();
+      scope = scope.equals("rack") ? PlacementConstraints.RACK :
+          PlacementConstraints.NODE;
+
+      PlacementConstraint pc;
+      if (cType.equals(IN)) {
+        pc = PlacementConstraints.build(
+            PlacementConstraints.targetIn(scope,
+                PlacementConstraints.PlacementTargets.allocationTag(
+                    targetTag)));
+        LOG.info("Creating IN Constraint for source tag [{}], num[{}]: " +
+                "scope[{}], target[{}]",
+            sourceTag, numContainers, scope, targetTag);
+      } else if (cType.equals(NOT_IN)) {
+        pc = PlacementConstraints.build(
+            PlacementConstraints.targetNotIn(scope,
+                PlacementConstraints.PlacementTargets.allocationTag(
+                    targetTag)));
+        LOG.info("Creating NOT_IN Constraint for source tag [{}], num[{}]: " +
+                "scope[{}], target[{}]",
+            sourceTag, numContainers, scope, targetTag);
+      } else if (cType.equals(CARDINALITY)) {
+        int minCard = ps.nextInt();
+        int maxCard = ps.nextInt();
+        pc = PlacementConstraints.build(
+            PlacementConstraints.targetCardinality(scope, minCard, maxCard,
+                PlacementConstraints.PlacementTargets.allocationTag(
+                    targetTag)));
+        LOG.info("Creating CARDINALITY Constraint source tag [{}], num[{}]: " +
+                "scope[{}], min[{}], max[{}], target[{}]",
+            sourceTag, numContainers, scope, minCard, maxCard, targetTag);
+      } else {
+        throw new RuntimeException(
+            "Could not parse constraintType [" + cType + "]" +
+                " in [" + specSplit[1] + "]");
+      }
+      pSpecs.put(sourceTag, new PlacementSpec(sourceTag, numContainers, pc));
+    }
+    return pSpecs;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org