You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/12/13 22:53:03 UTC

[11/74] [abbrv] hadoop git commit: YARN-5461. Initial code ported from slider-core module. (jianhe)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
new file mode 100644
index 0000000..16c2435
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -0,0 +1,2489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.api.ClusterDescriptionKeys;
+import org.apache.slider.api.ClusterDescriptionOperations;
+import org.apache.slider.api.ClusterNode;
+import org.apache.slider.api.InternalKeys;
+import org.apache.slider.api.ResourceKeys;
+import org.apache.slider.api.StatusKeys;
+import org.apache.slider.api.types.ApplicationLivenessInformation;
+import org.apache.slider.api.types.ComponentInformation;
+import org.apache.slider.api.types.RoleStatistics;
+import org.apache.slider.common.SliderExitCodes;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.tools.ConfigHelper;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTree;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.conf.MapOperations;
+import org.apache.slider.core.exceptions.BadClusterStateException;
+import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.ErrorStrings;
+import org.apache.slider.core.exceptions.NoSuchNodeException;
+import org.apache.slider.core.exceptions.SliderInternalStateException;
+import org.apache.slider.core.exceptions.TriggerClusterTeardownException;
+import org.apache.slider.core.persist.AggregateConfSerDeser;
+import org.apache.slider.core.persist.ConfTreeSerDeser;
+import org.apache.slider.providers.PlacementPolicy;
+import org.apache.slider.providers.ProviderRole;
+import org.apache.slider.server.appmaster.management.LongGauge;
+import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
+import org.apache.slider.server.appmaster.management.MetricsConstants;
+import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
+import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation;
+import org.apache.slider.server.appmaster.operations.ContainerRequestOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.slider.api.ResourceKeys.*;
+import static org.apache.slider.api.RoleKeys.*;
+import static org.apache.slider.api.StateValues.*;
+
+/**
+ * The model of all the ongoing state of a Slider AM.
+ *
+ * concurrency rules: any method which begins with <i>build</i>
+ * is not synchronized and intended to be used during
+ * initialization.
+ */
+public class AppState {
+  protected static final Logger log =
+    LoggerFactory.getLogger(AppState.class);
+  
+  private final AbstractClusterServices recordFactory;
+
+  private final MetricsAndMonitoring metricsAndMonitoring;
+
+  /**
+   * Flag set to indicate the application is live -this only happens
+   * after the buildInstance operation
+   */
+  private boolean applicationLive = false;
+
+  /**
+   * The definition of the instance. Flexing updates the resources section
+   * This is used as a synchronization point on activities that update
+   * the CD, and also to update some of the structures that
+   * feed in to the CD
+   */
+  private AggregateConf instanceDefinition;
+
+  /**
+   * Time the instance definition snapshots were created
+   */
+  private long snapshotTime;
+
+  /**
+   * Snapshot of the instance definition. This is fully
+   * resolved.
+   */
+  private AggregateConf instanceDefinitionSnapshot;
+
+  /**
+   * Snapshot of the raw instance definition; unresolved and
+   * without any patch of an AM into it.
+   */
+  private AggregateConf unresolvedInstanceDefinition;
+
+  /**
+   * snapshot of resources as of last update time
+   */
+  private ConfTreeOperations resourcesSnapshot;
+  private ConfTreeOperations appConfSnapshot;
+  private ConfTreeOperations internalsSnapshot;
+
+  /**
+   * This is the status, the live model
+   */
+  private ClusterDescription clusterStatus = new ClusterDescription();
+
+  /**
+   * Metadata provided by the AM for use in filling in status requests
+   */
+  private Map<String, String> applicationInfo;
+
+  /**
+   * Client properties created via the provider -static for the life
+   * of the application
+   */
+  private Map<String, String> clientProperties = new HashMap<>();
+
+  /**
+   * This is a template of the cluster status
+   */
+  private ClusterDescription clusterStatusTemplate = new ClusterDescription();
+
+  private final Map<Integer, RoleStatus> roleStatusMap =
+    new ConcurrentSkipListMap<>();
+
+  private final Map<String, ProviderRole> roles =
+    new ConcurrentHashMap<>();
+
+  private final ConcurrentSkipListMap<Integer, ProviderRole> rolePriorityMap =
+    new ConcurrentSkipListMap<>();
+
+  /**
+   * The master node.
+   */
+  private RoleInstance appMasterNode;
+
+  /**
+   * Hash map of the containers we have. This includes things that have
+   * been allocated but are not live; it is a superset of the live list
+   */
+  private final ConcurrentMap<ContainerId, RoleInstance> ownedContainers =
+    new ConcurrentHashMap<>();
+
+  /**
+   * Hash map of the containers we have released, but we
+   * are still awaiting acknowledgements on. Any failure of these
+   * containers is treated as a successful outcome
+   */
+  private final ConcurrentMap<ContainerId, Container> containersBeingReleased =
+    new ConcurrentHashMap<>();
+  
+  /**
+   * Counter for completed containers ( complete denotes successful or failed )
+   */
+  private final LongGauge completedContainerCount = new LongGauge();
+
+  /**
+   *   Count of failed containers
+   */
+  private final LongGauge failedContainerCount = new LongGauge();
+
+  /**
+   * # of started containers
+   */
+  private final LongGauge startedContainers = new LongGauge();
+
+  /**
+   * # of containers that failed to start 
+   */
+  private final LongGauge startFailedContainerCount = new LongGauge();
+
+  /**
+   * Track the number of surplus containers received and discarded
+   */
+  private final LongGauge surplusContainers = new LongGauge();
+
+  /**
+   * Track the number of requested containers.
+   * Important: this does not include AA requests which are yet to be issued.
+   */
+  private final LongGauge outstandingContainerRequests = new LongGauge();
+
+  /**
+   * Map of requested nodes. This records the command used to start it,
+   * resources, etc. When container started callback is received,
+   * the node is promoted from here to the containerMap
+   */
+  private final Map<ContainerId, RoleInstance> startingContainers =
+    new ConcurrentHashMap<>();
+
+  /**
+   * List of completed nodes. This isn't kept in the CD as it gets too
+   * big for the RPC responses. Indeed, we should think about how deep to get this
+   */
+  private final Map<ContainerId, RoleInstance> completedContainers
+    = new ConcurrentHashMap<>();
+
+  /**
+   * Nodes that failed to start.
+   * Again, kept out of the CD
+   */
+  private final Map<ContainerId, RoleInstance> failedContainers =
+    new ConcurrentHashMap<>();
+
+  /**
+   * Nodes that came assigned to a role above that
+   * which were asked for -this appears to happen
+   */
+  private final Set<ContainerId> surplusNodes = new HashSet<>();
+
+  /**
+   * Map of containerID to cluster nodes, for status reports.
+   * Access to this should be synchronized on the clusterDescription
+   */
+  private final Map<ContainerId, RoleInstance> liveNodes =
+    new ConcurrentHashMap<>();
+  private final AtomicInteger completionOfNodeNotInLiveListEvent =
+    new AtomicInteger();
+  private final AtomicInteger completionOfUnknownContainerEvent =
+    new AtomicInteger();
+
+
+  /**
+   * limits of container core numbers in this queue
+   */
+  private int containerMaxCores;
+  private int containerMinCores;
+
+  /**
+   * limits of container memory in this queue
+   */
+  private int containerMaxMemory;
+  private int containerMinMemory;
+
+  private RoleHistory roleHistory;
+  private Configuration publishedProviderConf;
+  private long startTimeThreshold;
+
+  private int failureThreshold = 10;
+  private int nodeFailureThreshold = 3;
+
+  private String logServerURL = "";
+
+  /**
+   * Selector of containers to release; application wide.
+   */
+  private ContainerReleaseSelector containerReleaseSelector;
+  private Resource minResource;
+  private Resource maxResource;
+
+  /**
+   * Create an instance
+   * @param recordFactory factory for YARN records
+   * @param metricsAndMonitoring metrics and monitoring services
+   */
+  public AppState(AbstractClusterServices recordFactory,
+      MetricsAndMonitoring metricsAndMonitoring) {
+    Preconditions.checkArgument(recordFactory != null, "null recordFactory");
+    Preconditions.checkArgument(metricsAndMonitoring != null, "null metricsAndMonitoring");
+    this.recordFactory = recordFactory;
+    this.metricsAndMonitoring = metricsAndMonitoring;
+
+    // register any metrics
+    register(MetricsConstants.CONTAINERS_OUTSTANDING_REQUESTS, outstandingContainerRequests);
+    register(MetricsConstants.CONTAINERS_SURPLUS, surplusContainers);
+    register(MetricsConstants.CONTAINERS_STARTED, startedContainers);
+    register(MetricsConstants.CONTAINERS_COMPLETED, completedContainerCount);
+    register(MetricsConstants.CONTAINERS_FAILED, failedContainerCount);
+    register(MetricsConstants.CONTAINERS_START_FAILED, startFailedContainerCount);
+  }
+
+  private void register(String name, Metric counter) {
+    this.metricsAndMonitoring.getMetrics().register(
+        MetricRegistry.name(AppState.class, name), counter);
+  }
+
+  public long getFailedCountainerCount() {
+    return failedContainerCount.getCount();
+  }
+
+  /**
+   * Increment the count
+   */
+  public void incFailedCountainerCount() {
+    failedContainerCount.inc();
+  }
+
+  public long getStartFailedCountainerCount() {
+    return startFailedContainerCount.getCount();
+  }
+
+  /**
+   * Increment the count and return the new value
+   */
+  public void incStartedCountainerCount() {
+    startedContainers.inc();
+  }
+
+  public long getStartedCountainerCount() {
+    return startedContainers.getCount();
+  }
+
+  /**
+   * Increment the count and return the new value
+   */
+  public void incStartFailedCountainerCount() {
+    startFailedContainerCount.inc();
+  }
+
+  public AtomicInteger getCompletionOfNodeNotInLiveListEvent() {
+    return completionOfNodeNotInLiveListEvent;
+  }
+
+  public AtomicInteger getCompletionOfUnknownContainerEvent() {
+    return completionOfUnknownContainerEvent;
+  }
+
+
+  public Map<Integer, RoleStatus> getRoleStatusMap() {
+    return roleStatusMap;
+  }
+  
+  protected Map<String, ProviderRole> getRoleMap() {
+    return roles;
+  }
+
+  public Map<Integer, ProviderRole> getRolePriorityMap() {
+    return rolePriorityMap;
+  }
+
+  private Map<ContainerId, RoleInstance> getStartingContainers() {
+    return startingContainers;
+  }
+
+  private Map<ContainerId, RoleInstance> getCompletedContainers() {
+    return completedContainers;
+  }
+
+  public Map<ContainerId, RoleInstance> getFailedContainers() {
+    return failedContainers;
+  }
+
+  public Map<ContainerId, RoleInstance> getLiveContainers() {
+    return liveNodes;
+  }
+
+  /**
+   * Get the current view of the cluster status.
+   * <p>
+   *   Calls to {@link #refreshClusterStatus()} trigger a
+   *   refresh of this field.
+   * <p>
+   * This is read-only
+   * to the extent that changes here do not trigger updates in the
+   * application state. 
+   * @return the cluster status
+   */
+  public synchronized ClusterDescription getClusterStatus() {
+    return clusterStatus;
+  }
+
+  @VisibleForTesting
+  protected synchronized void setClusterStatus(ClusterDescription clusterDesc) {
+    this.clusterStatus = clusterDesc;
+  }
+
+  /**
+   * Set the instance definition -this also builds the (now obsolete)
+   * cluster specification from it.
+   * 
+   * Important: this is for early binding and must not be used after the build
+   * operation is complete. 
+   * @param definition initial definition
+   * @throws BadConfigException
+   */
+  public synchronized void setInitialInstanceDefinition(AggregateConf definition)
+      throws BadConfigException, IOException {
+    log.debug("Setting initial instance definition");
+    // snapshot the definition
+    AggregateConfSerDeser serDeser = new AggregateConfSerDeser();
+
+    unresolvedInstanceDefinition = serDeser.fromInstance(definition);
+    
+    this.instanceDefinition = serDeser.fromInstance(definition);
+    onInstanceDefinitionUpdated();
+  }
+
+  public synchronized AggregateConf getInstanceDefinition() {
+    return instanceDefinition;
+  }
+
+  /**
+   * Get the role history of the application
+   * @return the role history
+   */
+  @VisibleForTesting
+  public RoleHistory getRoleHistory() {
+    return roleHistory;
+  }
+
+  /**
+   * Get the path used for history files
+   * @return the directory used for history files
+   */
+  @VisibleForTesting
+  public Path getHistoryPath() {
+    return roleHistory.getHistoryPath();
+  }
+
+  /**
+   * Set the container limits -the min and max values for
+   * resource requests. All requests must be multiples of the min
+   * values.
+   * @param minMemory min memory MB
+   * @param maxMemory maximum memory
+   * @param minCores min v core count
+   * @param maxCores maximum cores
+   */
+  public void setContainerLimits(int minMemory,int maxMemory, int minCores, int maxCores) {
+    containerMinCores = minCores;
+    containerMaxCores = maxCores;
+    containerMinMemory = minMemory;
+    containerMaxMemory = maxMemory;
+    minResource = recordFactory.newResource(containerMinMemory, containerMinCores);
+    maxResource = recordFactory.newResource(containerMaxMemory, containerMaxCores);
+  }
+
+  public ConfTreeOperations getResourcesSnapshot() {
+    return resourcesSnapshot;
+  }
+
+  public ConfTreeOperations getAppConfSnapshot() {
+    return appConfSnapshot;
+  }
+
+  public ConfTreeOperations getInternalsSnapshot() {
+    return internalsSnapshot;
+  }
+
+  public boolean isApplicationLive() {
+    return applicationLive;
+  }
+
+  public long getSnapshotTime() {
+    return snapshotTime;
+  }
+
+  public synchronized AggregateConf getInstanceDefinitionSnapshot() {
+    return instanceDefinitionSnapshot;
+  }
+
+  public AggregateConf getUnresolvedInstanceDefinition() {
+    return unresolvedInstanceDefinition;
+  }
+
+  public synchronized void buildInstance(AppStateBindingInfo binding)
+      throws BadClusterStateException, BadConfigException, IOException {
+    binding.validate();
+
+    log.debug("Building application state");
+    publishedProviderConf = binding.publishedProviderConf;
+    applicationInfo = binding.applicationInfo != null ? binding.applicationInfo
+                        : new HashMap<String, String>();
+
+    clientProperties = new HashMap<>();
+    containerReleaseSelector = binding.releaseSelector;
+
+
+    Set<String> confKeys = ConfigHelper.sortedConfigKeys(publishedProviderConf);
+
+    //  Add the -site configuration properties
+    for (String key : confKeys) {
+      String val = publishedProviderConf.get(key);
+      clientProperties.put(key, val);
+    }
+
+    // set the cluster specification (once its dependency the client properties
+    // is out the way
+    setInitialInstanceDefinition(binding.instanceDefinition);
+
+    //build the initial role list
+    List<ProviderRole> roleList = new ArrayList<>(binding.roles);
+    for (ProviderRole providerRole : roleList) {
+      buildRole(providerRole);
+    }
+
+    ConfTreeOperations resources = instanceDefinition.getResourceOperations();
+
+    Set<String> roleNames = resources.getComponentNames();
+    for (String name : roleNames) {
+      if (roles.containsKey(name)) {
+        continue;
+      }
+      if (hasUniqueNames(resources, name)) {
+        log.info("Skipping group {}", name);
+        continue;
+      }
+      // this is a new value
+      log.info("Adding role {}", name);
+      MapOperations resComponent = resources.getComponent(name);
+      ProviderRole dynamicRole = createDynamicProviderRole(name, resComponent);
+      buildRole(dynamicRole);
+      roleList.add(dynamicRole);
+    }
+    //then pick up the requirements
+    buildRoleRequirementsFromResources();
+
+    //set the livespan
+    MapOperations globalResOpts = instanceDefinition.getResourceOperations().getGlobalOptions();
+
+    startTimeThreshold = globalResOpts.getOptionInt(
+        InternalKeys.INTERNAL_CONTAINER_FAILURE_SHORTLIFE,
+        InternalKeys.DEFAULT_INTERNAL_CONTAINER_FAILURE_SHORTLIFE);
+
+    failureThreshold = globalResOpts.getOptionInt(
+        CONTAINER_FAILURE_THRESHOLD,
+        DEFAULT_CONTAINER_FAILURE_THRESHOLD);
+    nodeFailureThreshold = globalResOpts.getOptionInt(
+        NODE_FAILURE_THRESHOLD,
+        DEFAULT_NODE_FAILURE_THRESHOLD);
+    initClusterStatus();
+
+
+    // set up the role history
+    roleHistory = new RoleHistory(roleStatusMap.values(), recordFactory);
+    roleHistory.register(metricsAndMonitoring);
+    roleHistory.onStart(binding.fs, binding.historyPath);
+    // trigger first node update
+    roleHistory.onNodesUpdated(binding.nodeReports);
+
+
+    //rebuild any live containers
+    rebuildModelFromRestart(binding.liveContainers);
+
+    // any am config options to pick up
+    logServerURL = binding.serviceConfig.get(YarnConfiguration.YARN_LOG_SERVER_URL, "");
+    //mark as live
+    applicationLive = true;
+  }
+
+  public void initClusterStatus() {
+    //copy into cluster status. 
+    ClusterDescription status = ClusterDescription.copy(clusterStatusTemplate);
+    status.state = STATE_CREATED;
+    MapOperations infoOps = new MapOperations("info", status.info);
+    infoOps.mergeWithoutOverwrite(applicationInfo);
+    SliderUtils.addBuildInfo(infoOps, "status");
+
+    long now = now();
+    status.setInfoTime(StatusKeys.INFO_LIVE_TIME_HUMAN,
+                              StatusKeys.INFO_LIVE_TIME_MILLIS,
+                              now);
+    SliderUtils.setInfoTime(infoOps,
+        StatusKeys.INFO_LIVE_TIME_HUMAN,
+        StatusKeys.INFO_LIVE_TIME_MILLIS,
+        now);
+    if (0 == status.createTime) {
+      status.createTime = now;
+      SliderUtils.setInfoTime(infoOps,
+          StatusKeys.INFO_CREATE_TIME_HUMAN,
+          StatusKeys.INFO_CREATE_TIME_MILLIS,
+          now);
+    }
+    status.state = STATE_LIVE;
+
+      //set the app state to this status
+    setClusterStatus(status);
+  }
+
+  /**
+   * Build a dynamic provider role
+   * @param name name of role
+   * @return a new provider role
+   * @throws BadConfigException bad configuration
+   */
+  public ProviderRole createDynamicProviderRole(String name, MapOperations component)
+      throws BadConfigException {
+    return createDynamicProviderRole(name, name, component);
+  }
+
+  /**
+   * Build a dynamic provider role
+   * @param name name of role
+   * @param group group of role
+   * @return a new provider role
+   * @throws BadConfigException bad configuration
+   */
+  public ProviderRole createDynamicProviderRole(String name, String group, MapOperations component)
+      throws BadConfigException {
+    String priOpt = component.getMandatoryOption(COMPONENT_PRIORITY);
+    int priority = SliderUtils.parseAndValidate(
+        "value of " + name + " " + COMPONENT_PRIORITY, priOpt, 0, 1, -1);
+
+    String placementOpt = component.getOption(COMPONENT_PLACEMENT_POLICY,
+        Integer.toString(PlacementPolicy.DEFAULT));
+
+    int placement = SliderUtils.parseAndValidate(
+        "value of " + name + " " + COMPONENT_PLACEMENT_POLICY, placementOpt, 0, 0, -1);
+
+    int placementTimeout = component.getOptionInt(PLACEMENT_ESCALATE_DELAY,
+            DEFAULT_PLACEMENT_ESCALATE_DELAY_SECONDS);
+
+    ProviderRole newRole = new ProviderRole(name,
+        group,
+        priority,
+        placement,
+        getNodeFailureThresholdForRole(group),
+        placementTimeout,
+        component.getOption(YARN_LABEL_EXPRESSION, DEF_YARN_LABEL_EXPRESSION));
+    log.info("New {} ", newRole);
+    return newRole;
+  }
+
+  /**
+   * Actions to perform when an instance definition is updated
+   * Currently: 
+   * <ol>
+   *   <li>
+   *     resolve the configuration
+   *   </li>
+   *   <li>
+   *     update the cluster spec derivative
+   *   </li>
+   * </ol>
+   *  
+   * @throws BadConfigException
+   */
+  private synchronized void onInstanceDefinitionUpdated()
+      throws BadConfigException, IOException {
+
+    log.debug("Instance definition updated");
+    //note the time 
+    snapshotTime = now();
+
+    for (String component : instanceDefinition.getResourceOperations().getComponentNames()) {
+      instanceDefinition.getAppConfOperations().getOrAddComponent(component);
+    }
+
+    // resolve references if not already done
+    instanceDefinition.resolve();
+
+    // force in the AM desired state values
+    ConfTreeOperations resources = instanceDefinition.getResourceOperations();
+
+    if (resources.getComponent(SliderKeys.COMPONENT_AM) != null) {
+      resources.setComponentOpt(
+          SliderKeys.COMPONENT_AM, COMPONENT_INSTANCES, "1");
+    }
+
+
+    //snapshot all three sectons
+    resourcesSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getResources());
+    appConfSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getAppConf());
+    internalsSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getInternal());
+    //build a new aggregate from the snapshots
+    instanceDefinitionSnapshot = new AggregateConf(resourcesSnapshot.confTree,
+                                                   appConfSnapshot.confTree,
+                                                   internalsSnapshot.confTree);
+    instanceDefinitionSnapshot.setName(instanceDefinition.getName());
+
+    clusterStatusTemplate = ClusterDescriptionOperations.buildFromInstanceDefinition(
+          instanceDefinition);
+
+    // Add the -site configuration properties
+    for (Map.Entry<String, String> prop : clientProperties.entrySet()) {
+      clusterStatusTemplate.clientProperties.put(prop.getKey(), prop.getValue());
+    }
+
+  }
+
+  /**
+   * The resource configuration is updated -review and update state.
+   * @param resources updated resources specification
+   * @return a list of any dynamically added provider roles
+   * (purely for testing purposes)
+   */
+  @VisibleForTesting
+  public synchronized List<ProviderRole> updateResourceDefinitions(ConfTree resources)
+      throws BadConfigException, IOException {
+    log.debug("Updating resources to {}", resources);
+    // snapshot the (possibly unresolved) values
+    ConfTreeSerDeser serDeser = new ConfTreeSerDeser();
+    unresolvedInstanceDefinition.setResources(
+        serDeser.fromInstance(resources));
+    // assign another copy under the instance definition for resolving
+    // and then driving application size
+    instanceDefinition.setResources(serDeser.fromInstance(resources));
+    onInstanceDefinitionUpdated();
+
+    // propagate the role table
+    Map<String, Map<String, String>> updated = resources.components;
+    getClusterStatus().roles = SliderUtils.deepClone(updated);
+    getClusterStatus().updateTime = now();
+    return buildRoleRequirementsFromResources();
+  }
+
+  /**
+   * build the role requirements from the cluster specification
+   * @return a list of any dynamically added provider roles
+   */
+  private List<ProviderRole> buildRoleRequirementsFromResources() throws BadConfigException {
+
+    List<ProviderRole> newRoles = new ArrayList<>(0);
+
+    // now update every role's desired count.
+    // if there are no instance values, that role count goes to zero
+
+    ConfTreeOperations resources =
+        instanceDefinition.getResourceOperations();
+
+    // Add all the existing roles
+    Map<String, Integer> groupCounts = new HashMap<>();
+    for (RoleStatus roleStatus : getRoleStatusMap().values()) {
+      if (roleStatus.isExcludeFromFlexing()) {
+        // skip inflexible roles, e.g AM itself
+        continue;
+      }
+      long currentDesired = roleStatus.getDesired();
+      String role = roleStatus.getName();
+      String roleGroup = roleStatus.getGroup();
+      int desiredInstanceCount = getDesiredInstanceCount(resources, roleGroup);
+
+      int newDesired = desiredInstanceCount;
+      if (hasUniqueNames(resources, roleGroup)) {
+        Integer groupCount = 0;
+        if (groupCounts.containsKey(roleGroup)) {
+          groupCount = groupCounts.get(roleGroup);
+        }
+
+        newDesired = desiredInstanceCount - groupCount;
+
+        if (newDesired > 0) {
+          newDesired = 1;
+          groupCounts.put(roleGroup, groupCount + newDesired);
+        } else {
+          newDesired = 0;
+        }
+      }
+
+      if (newDesired == 0) {
+        log.info("Role {} has 0 instances specified", role);
+      }
+      if (currentDesired != newDesired) {
+        log.info("Role {} flexed from {} to {}", role, currentDesired,
+            newDesired);
+        roleStatus.setDesired(newDesired);
+      }
+    }
+
+    // now the dynamic ones. Iterate through the the cluster spec and
+    // add any role status entries not in the role status
+    Set<String> roleNames = resources.getComponentNames();
+    for (String name : roleNames) {
+      if (roles.containsKey(name)) {
+        continue;
+      }
+      if (hasUniqueNames(resources, name)) {
+        // THIS NAME IS A GROUP
+        int desiredInstanceCount = getDesiredInstanceCount(resources, name);
+        Integer groupCount = 0;
+        if (groupCounts.containsKey(name)) {
+          groupCount = groupCounts.get(name);
+        }
+        for (int i = groupCount + 1; i <= desiredInstanceCount; i++) {
+          int priority = resources.getComponentOptInt(name, COMPONENT_PRIORITY, i);
+          // this is a new instance of an existing group
+          String newName = String.format("%s%d", name, i);
+          int newPriority = getNewPriority(priority + i - 1);
+          log.info("Adding new role {}", newName);
+          MapOperations component = resources.getComponent(name,
+              Collections.singletonMap(COMPONENT_PRIORITY,
+                  Integer.toString(newPriority)));
+          if (component == null) {
+            throw new BadConfigException("Component is null for name = " + name
+                + ", newPriority =" + newPriority);
+          }
+          ProviderRole dynamicRole = createDynamicProviderRole(newName, name, component);
+          RoleStatus roleStatus = buildRole(dynamicRole);
+          roleStatus.setDesired(1);
+          log.info("New role {}", roleStatus);
+          if (roleHistory != null) {
+            roleHistory.addNewRole(roleStatus);
+          }
+          newRoles.add(dynamicRole);
+        }
+      } else {
+        // this is a new value
+        log.info("Adding new role {}", name);
+        MapOperations component = resources.getComponent(name);
+        ProviderRole dynamicRole = createDynamicProviderRole(name, component);
+        RoleStatus roleStatus = buildRole(dynamicRole);
+        roleStatus.setDesired(getDesiredInstanceCount(resources, name));
+        log.info("New role {}", roleStatus);
+        if (roleHistory != null) {
+          roleHistory.addNewRole(roleStatus);
+        }
+        newRoles.add(dynamicRole);
+      }
+    }
+    // and fill in all those roles with their requirements
+    buildRoleResourceRequirements();
+
+    return newRoles;
+  }
+
+  private int getNewPriority(int start) {
+    if (!rolePriorityMap.containsKey(start)) {
+      return start;
+    }
+    return rolePriorityMap.lastKey() + 1;
+  }
+
+  /**
+   * Get the desired instance count of a role, rejecting negative values
+   * @param resources resource map
+   * @param roleGroup role group
+   * @return the instance count
+   * @throws BadConfigException if the count is negative
+   */
+  private int getDesiredInstanceCount(ConfTreeOperations resources,
+      String roleGroup) throws BadConfigException {
+    int desiredInstanceCount =
+      resources.getComponentOptInt(roleGroup, COMPONENT_INSTANCES, 0);
+
+    if (desiredInstanceCount < 0) {
+      log.error("Role {} has negative desired instances : {}", roleGroup,
+          desiredInstanceCount);
+      throw new BadConfigException(
+          "Negative instance count (%) requested for component %s",
+          desiredInstanceCount, roleGroup);
+    }
+    return desiredInstanceCount;
+  }
+
+  private Boolean hasUniqueNames(ConfTreeOperations resources, String group) {
+    MapOperations component = resources.getComponent(group);
+    if (component == null) {
+      log.info("Component was null for {} when checking unique names", group);
+      return Boolean.FALSE;
+    }
+    return component.getOptionBool(UNIQUE_NAMES, Boolean.FALSE);
+  }
+
+  /**
+   * Add knowledge of a role.
+   * This is a build-time operation that is not synchronized, and
+   * should be used while setting up the system state -before servicing
+   * requests.
+   * @param providerRole role to add
+   * @return the role status built up
+   * @throws BadConfigException if a role of that priority already exists
+   */
+  public RoleStatus buildRole(ProviderRole providerRole) throws BadConfigException {
+    // build role status map
+    int priority = providerRole.id;
+    if (roleStatusMap.containsKey(priority)) {
+      throw new BadConfigException("Duplicate Provider Key: %s and %s",
+                                   providerRole,
+                                   roleStatusMap.get(priority));
+    }
+    RoleStatus roleStatus = new RoleStatus(providerRole);
+    roleStatusMap.put(priority, roleStatus);
+    String name = providerRole.name;
+    roles.put(name, providerRole);
+    rolePriorityMap.put(priority, providerRole);
+    // register its entries
+    metricsAndMonitoring.addMetricSet(MetricsConstants.PREFIX_SLIDER_ROLES + name, roleStatus);
+    return roleStatus;
+  }
+
+  /**
+   * Build up the requirements of every resource
+   */
+  private void buildRoleResourceRequirements() {
+    for (RoleStatus role : roleStatusMap.values()) {
+      role.setResourceRequirements(
+          buildResourceRequirements(role, recordFactory.newResource()));
+    }
+  }
+
+  /**
+   * build up the special master node, which lives
+   * in the live node set but has a lifecycle bonded to the AM
+   * @param containerId the AM master
+   * @param host hostname
+   * @param amPort port
+   * @param nodeHttpAddress http address: may be null
+   */
+  public void buildAppMasterNode(ContainerId containerId,
+                                 String host,
+                                 int amPort,
+                                 String nodeHttpAddress) {
+    Container container = new ContainerPBImpl();
+    container.setId(containerId);
+    NodeId nodeId = NodeId.newInstance(host, amPort);
+    container.setNodeId(nodeId);
+    container.setNodeHttpAddress(nodeHttpAddress);
+    RoleInstance am = new RoleInstance(container);
+    am.role = SliderKeys.COMPONENT_AM;
+    am.group = SliderKeys.COMPONENT_AM;
+    am.roleId = SliderKeys.ROLE_AM_PRIORITY_INDEX;
+    am.createTime =now();
+    am.startTime = am.createTime;
+    appMasterNode = am;
+    //it is also added to the set of live nodes
+    getLiveContainers().put(containerId, am);
+    putOwnedContainer(containerId, am);
+
+    // patch up the role status
+    RoleStatus roleStatus = roleStatusMap.get(SliderKeys.ROLE_AM_PRIORITY_INDEX);
+    roleStatus.setDesired(1);
+    roleStatus.incActual();
+    roleStatus.incStarted();
+  }
+
+  /**
+   * Note that the master node has been launched,
+   * though it isn't considered live until any forked
+   * processes are running. It is NOT registered with
+   * the role history -the container is incomplete
+   * and it will just cause confusion
+   */
+  public void noteAMLaunched() {
+    getLiveContainers().put(appMasterNode.getContainerId(), appMasterNode);
+  }
+
+  /**
+   * AM declares ourselves live in the cluster description.
+   * This is meant to be triggered from the callback
+   * indicating the spawned process is up and running.
+   */
+  public void noteAMLive() {
+    appMasterNode.state = STATE_LIVE;
+  }
+
+  /**
+   * Look up the status entry of a role or raise an exception
+   * @param key role ID
+   * @return the status entry
+   * @throws RuntimeException if the role cannot be found
+   */
+  public RoleStatus lookupRoleStatus(int key) {
+    RoleStatus rs = getRoleStatusMap().get(key);
+    if (rs == null) {
+      throw new RuntimeException("Cannot find role for role ID " + key);
+    }
+    return rs;
+  }
+
+  /**
+   * Look up the status entry of a container or raise an exception
+   *
+   * @param c container
+   * @return the status entry
+   * @throws RuntimeException if the role cannot be found
+   */
+  public RoleStatus lookupRoleStatus(Container c) {
+    return lookupRoleStatus(ContainerPriority.extractRole(c));
+  }
+
+  /**
+   * Get a deep clone of the role status list. Concurrent events may mean this
+   * list (or indeed, some of the role status entries) may be inconsistent
+   * @return a snapshot of the role status entries
+   */
+  public List<RoleStatus> cloneRoleStatusList() {
+    Collection<RoleStatus> statuses = roleStatusMap.values();
+    List<RoleStatus> statusList = new ArrayList<>(statuses.size());
+    try {
+      for (RoleStatus status : statuses) {
+        statusList.add((RoleStatus)(status.clone()));
+      }
+    } catch (CloneNotSupportedException e) {
+      log.warn("Unexpected cloning failure: {}", e, e);
+    }
+    return statusList;
+  }
+
+
+  /**
+   * Look up a role in the map
+   * @param name role name
+   * @return the instance
+   * @throws YarnRuntimeException if not found
+   */
+  public RoleStatus lookupRoleStatus(String name) throws YarnRuntimeException {
+    ProviderRole providerRole = roles.get(name);
+    if (providerRole == null) {
+      throw new YarnRuntimeException("Unknown role " + name);
+    }
+    return lookupRoleStatus(providerRole.id);
+  }
+
+
+  /**
+   * Clone the list of active (==owned) containers
+   * @return the list of role instances representing all owned containers
+   */
+  public synchronized List<RoleInstance> cloneOwnedContainerList() {
+    Collection<RoleInstance> values = ownedContainers.values();
+    return new ArrayList<>(values);
+  }
+
+  /**
+   * Get the number of active (==owned) containers
+   * @return
+   */
+  public int getNumOwnedContainers() {
+    return ownedContainers.size();
+  }
+  
+  /**
+   * Look up an active container: any container that the AM has, even
+   * if it is not currently running/live
+   */
+  public RoleInstance getOwnedContainer(ContainerId id) {
+    return ownedContainers.get(id);
+  }
+
+  /**
+   * Remove an owned container
+   * @param id container ID
+   * @return the instance removed
+   */
+  private RoleInstance removeOwnedContainer(ContainerId id) {
+    return ownedContainers.remove(id);
+  }
+
+  /**
+   * set/update an owned container
+   * @param id container ID
+   * @param instance
+   * @return
+   */
+  private RoleInstance putOwnedContainer(ContainerId id,
+      RoleInstance instance) {
+    return ownedContainers.put(id, instance);
+  }
+
+  /**
+   * Clone the live container list. This is synchronized.
+   * @return a snapshot of the live node list
+   */
+  public synchronized List<RoleInstance> cloneLiveContainerInfoList() {
+    List<RoleInstance> allRoleInstances;
+    Collection<RoleInstance> values = getLiveContainers().values();
+    allRoleInstances = new ArrayList<>(values);
+    return allRoleInstances;
+  }
+
+  /**
+   * Lookup live instance by string value of container ID
+   * @param containerId container ID as a string
+   * @return the role instance for that container
+   * @throws NoSuchNodeException if it does not exist
+   */
+  public synchronized RoleInstance getLiveInstanceByContainerID(String containerId)
+      throws NoSuchNodeException {
+    Collection<RoleInstance> nodes = getLiveContainers().values();
+    return findNodeInCollection(containerId, nodes);
+  }
+
+  /**
+   * Lookup owned instance by string value of container ID
+   * @param containerId container ID as a string
+   * @return the role instance for that container
+   * @throws NoSuchNodeException if it does not exist
+   */
+  public synchronized RoleInstance getOwnedInstanceByContainerID(String containerId)
+      throws NoSuchNodeException {
+    Collection<RoleInstance> nodes = ownedContainers.values();
+    return findNodeInCollection(containerId, nodes);
+  }
+
+  /**
+   * Iterate through a collection of role instances to find one with a
+   * specific (string) container ID
+   * @param containerId container ID as a string
+   * @param nodes collection
+   * @return the found node 
+   * @throws NoSuchNodeException if there was no match
+   */
+  private RoleInstance findNodeInCollection(String containerId,
+      Collection<RoleInstance> nodes) throws NoSuchNodeException {
+    RoleInstance found = null;
+    for (RoleInstance node : nodes) {
+      if (containerId.equals(node.id)) {
+        found = node;
+        break;
+      }
+    }
+    if (found != null) {
+      return found;
+    } else {
+      //at this point: no node
+      throw new NoSuchNodeException("Unknown node: " + containerId);
+    }
+  }
+
+  public synchronized List<RoleInstance> getLiveInstancesByContainerIDs(
+    Collection<String> containerIDs) {
+    //first, a hashmap of those containerIDs is built up
+    Set<String> uuidSet = new HashSet<String>(containerIDs);
+    List<RoleInstance> nodes = new ArrayList<RoleInstance>(uuidSet.size());
+    Collection<RoleInstance> clusterNodes = getLiveContainers().values();
+
+    for (RoleInstance node : clusterNodes) {
+      if (uuidSet.contains(node.id)) {
+        nodes.add(node);
+      }
+    }
+    //at this point: a possibly empty list of nodes
+    return nodes;
+  }
+
+  /**
+   * Enum all nodes by role. 
+   * @param role role, or "" for all roles
+   * @return a list of nodes, may be empty
+   */
+  public synchronized List<RoleInstance> enumLiveNodesInRole(String role) {
+    List<RoleInstance> nodes = new ArrayList<RoleInstance>();
+    Collection<RoleInstance> allRoleInstances = getLiveContainers().values();
+    for (RoleInstance node : allRoleInstances) {
+      if (role.isEmpty() || role.equals(node.role)) {
+        nodes.add(node);
+      }
+    }
+    return nodes;
+  }
+
+ 
+  /**
+   * enum nodes by role ID, from either the owned or live node list
+   * @param roleId role the container must be in
+   * @param owned flag to indicate "use owned list" rather than the smaller
+   * "live" list
+   * @return a list of nodes, may be empty
+   */
+  public synchronized List<RoleInstance> enumNodesWithRoleId(int roleId,
+      boolean owned) {
+    List<RoleInstance> nodes = new ArrayList<RoleInstance>();
+    Collection<RoleInstance> allRoleInstances;
+    allRoleInstances = owned ? ownedContainers.values() : liveNodes.values();
+    for (RoleInstance node : allRoleInstances) {
+      if (node.roleId == roleId) {
+        nodes.add(node);
+      }
+    }
+    return nodes;
+  }
+
+  /**
+   * Build an instance map.
+   * @return the map of Role name to list of role instances
+   */
+  private synchronized Map<String, List<String>> createRoleToInstanceMap() {
+    Map<String, List<String>> map = new HashMap<String, List<String>>();
+    for (RoleInstance node : getLiveContainers().values()) {
+      List<String> containers = map.get(node.role);
+      if (containers == null) {
+        containers = new ArrayList<String>();
+        map.put(node.role, containers);
+      }
+      containers.add(node.id);
+    }
+    return map;
+  }
+
+  /**
+   * Build a map of role->nodename->node-info
+   * 
+   * @return the map of Role name to list of Cluster Nodes
+   */
+  public synchronized Map<String, Map<String, ClusterNode>> createRoleToClusterNodeMap() {
+    Map<String, Map<String, ClusterNode>> map = new HashMap<>();
+    for (RoleInstance node : getLiveContainers().values()) {
+      
+      Map<String, ClusterNode> containers = map.get(node.role);
+      if (containers == null) {
+        containers = new HashMap<String, ClusterNode>();
+        map.put(node.role, containers);
+      }
+      ClusterNode clusterNode = node.toClusterNode();
+      containers.put(clusterNode.name, clusterNode);
+    }
+    return map;
+  }
+
+  /**
+   * Notification called just before the NM is asked to 
+   * start a container
+   * @param container container to start
+   * @param instance clusterNode structure
+   */
+  public void containerStartSubmitted(Container container,
+                                      RoleInstance instance) {
+    instance.state = STATE_SUBMITTED;
+    instance.container = container;
+    instance.createTime = now();
+    getStartingContainers().put(container.getId(), instance);
+    putOwnedContainer(container.getId(), instance);
+    roleHistory.onContainerStartSubmitted(container, instance);
+  }
+
+  /**
+   * Note that a container has been submitted for release; update internal state
+   * and mark the associated ContainerInfo released field to indicate that
+   * while it is still in the active list, it has been queued for release.
+   *
+   * @param container container
+   * @throws SliderInternalStateException if there is no container of that ID
+   * on the active list
+   */
+  public synchronized void containerReleaseSubmitted(Container container)
+      throws SliderInternalStateException {
+    ContainerId id = container.getId();
+    //look up the container
+    RoleInstance instance = getOwnedContainer(id);
+    if (instance == null) {
+      throw new SliderInternalStateException(
+        "No active container with ID " + id);
+    }
+    //verify that it isn't already released
+    if (containersBeingReleased.containsKey(id)) {
+      throw new SliderInternalStateException(
+        "Container %s already queued for release", id);
+    }
+    instance.released = true;
+    containersBeingReleased.put(id, instance.container);
+    RoleStatus role = lookupRoleStatus(instance.roleId);
+    role.incReleasing();
+    roleHistory.onContainerReleaseSubmitted(container);
+  }
+
+  /**
+   * Create a container request.
+   * Update internal state, such as the role request count. 
+   * Anti-Affine: the {@link RoleStatus#outstandingAArequest} is set here.
+   * This is where role history information will be used for placement decisions.
+   * @param role role
+   * @return the container request to submit or null if there is none
+   */
+  private AMRMClient.ContainerRequest createContainerRequest(RoleStatus role) {
+    if (role.isAntiAffinePlacement()) {
+      return createAAContainerRequest(role);
+    } else {
+      incrementRequestCount(role);
+      OutstandingRequest request = roleHistory.requestContainerForRole(role);
+      if (request != null) {
+        return request.getIssuedRequest();
+      } else {
+        return null;
+      }
+    }
+  }
+
+  /**
+   * Create a container request.
+   * Update internal state, such as the role request count.
+   * Anti-Affine: the {@link RoleStatus#outstandingAArequest} is set here.
+   * This is where role history information will be used for placement decisions.
+   * @param role role
+   * @return the container request to submit or null if there is none
+   */
+  private AMRMClient.ContainerRequest createAAContainerRequest(RoleStatus role) {
+    OutstandingRequest request = roleHistory.requestContainerForAARole(role);
+    if (request == null) {
+      return null;
+    }
+    incrementRequestCount(role);
+    role.setOutstandingAArequest(request);
+    return request.getIssuedRequest();
+  }
+
+  /**
+   * Increment the request count of a role.
+   * <p>
+   *   Also updates application state counters
+   * @param role role being requested.
+   */
+  protected void incrementRequestCount(RoleStatus role) {
+    role.incRequested();
+    incOutstandingContainerRequests();
+  }
+
+  /**
+   * Inc #of outstanding requests.
+   */
+  private void incOutstandingContainerRequests() {
+     outstandingContainerRequests.inc();
+  }
+
+  /**
+   * Decrement the number of outstanding requests. This never goes below zero.
+   */
+  private void decOutstandingContainerRequests() {
+    synchronized (outstandingContainerRequests) {
+      if (outstandingContainerRequests.getCount() > 0) {
+        // decrement but never go below zero
+        outstandingContainerRequests.dec();
+      }
+    }
+  }
+
+
+  /**
+   * Get the value of a YARN requirement (cores, RAM, etc).
+   * These are returned as integers, but there is special handling of the 
+   * string {@link ResourceKeys#YARN_RESOURCE_MAX}, which triggers
+   * the return of the maximum value.
+   * @param group component to get from
+   * @param option option name
+   * @param defVal default value
+   * @param maxVal value to return if the max val is requested
+   * @return parsed value
+   * @throws NumberFormatException if the role could not be parsed.
+   */
+  private int getResourceRequirement(ConfTreeOperations resources,
+                                     String group,
+                                     String option,
+                                     int defVal,
+                                     int maxVal) {
+
+    String val = resources.getComponentOpt(group, option,
+        Integer.toString(defVal));
+    Integer intVal;
+    if (YARN_RESOURCE_MAX.equals(val)) {
+      intVal = maxVal;
+    } else {
+      intVal = Integer.decode(val);
+    }
+    return intVal;
+  }
+
+  /**
+   * Build up the resource requirements for this role from the
+   * cluster specification, including substituing max allowed values
+   * if the specification asked for it.
+   * @param role role
+   * @param capability capability to set up. A new one may be created
+   * during normalization
+   */
+  public Resource buildResourceRequirements(RoleStatus role, Resource capability) {
+    // Set up resource requirements from role values
+    String name = role.getName();
+    String group = role.getGroup();
+    ConfTreeOperations resources = getResourcesSnapshot();
+    int cores = getResourceRequirement(resources,
+                                       group,
+                                       YARN_CORES,
+                                       DEF_YARN_CORES,
+                                       containerMaxCores);
+    capability.setVirtualCores(cores);
+    int ram = getResourceRequirement(resources, group,
+                                     YARN_MEMORY,
+                                     DEF_YARN_MEMORY,
+                                     containerMaxMemory);
+    capability.setMemory(ram);
+    log.debug("Component {} has RAM={}, vCores ={}", name, ram, cores);
+    Resource normalized = recordFactory.normalize(capability, minResource,
+        maxResource);
+    if (!Resources.equals(normalized, capability)) {
+      // resource requirements normalized to something other than asked for.
+      // LOG @ WARN so users can see why this is happening.
+      log.warn("Resource requirements of {} normalized" +
+              " from {} to {}", name, capability, normalized);
+    }
+    return normalized;
+  }
+
+  /**
+   * add a launched container to the node map for status responses
+   * @param container id
+   * @param node node details
+   */
+  private void addLaunchedContainer(Container container, RoleInstance node) {
+    node.container = container;
+    if (node.role == null) {
+      throw new RuntimeException(
+        "Unknown role for node " + node);
+    }
+    getLiveContainers().put(node.getContainerId(), node);
+    //tell role history
+    roleHistory.onContainerStarted(container);
+  }
+
+  /**
+   * container start event
+   * @param containerId container that is to be started
+   * @return the role instance, or null if there was a problem
+   */
+  public synchronized RoleInstance onNodeManagerContainerStarted(ContainerId containerId) {
+    try {
+      return innerOnNodeManagerContainerStarted(containerId);
+    } catch (YarnRuntimeException e) {
+      log.error("NodeManager callback on started container {} failed",
+                containerId,
+                e);
+      return null;
+    }
+  }
+
+   /**
+   * container start event handler -throwing an exception on problems
+   * @param containerId container that is to be started
+   * @return the role instance
+   * @throws RuntimeException on problems
+   */
+  @VisibleForTesting
+  public RoleInstance innerOnNodeManagerContainerStarted(ContainerId containerId) {
+    incStartedCountainerCount();
+    RoleInstance instance = getOwnedContainer(containerId);
+    if (instance == null) {
+      //serious problem
+      throw new YarnRuntimeException("Container not in active containers start "+
+                containerId);
+    }
+    if (instance.role == null) {
+      throw new YarnRuntimeException("Component instance has no instance name " +
+                                     instance);
+    }
+    instance.startTime = now();
+    RoleInstance starting = getStartingContainers().remove(containerId);
+    if (null == starting) {
+      throw new YarnRuntimeException(
+        "Container "+ containerId +" is already started");
+    }
+    instance.state = STATE_LIVE;
+    RoleStatus roleStatus = lookupRoleStatus(instance.roleId);
+    roleStatus.incStarted();
+    Container container = instance.container;
+    addLaunchedContainer(container, instance);
+    return instance;
+  }
+
+  /**
+   * update the application state after a failure to start a container.
+   * This is perhaps where blacklisting could be most useful: failure
+   * to start a container is a sign of a more serious problem
+   * than a later exit.
+   *
+   * -relayed from NMClientAsync.CallbackHandler 
+   * @param containerId failing container
+   * @param thrown what was thrown
+   */
+  public synchronized void onNodeManagerContainerStartFailed(ContainerId containerId,
+                                                             Throwable thrown) {
+    removeOwnedContainer(containerId);
+    incFailedCountainerCount();
+    incStartFailedCountainerCount();
+    RoleInstance instance = getStartingContainers().remove(containerId);
+    if (null != instance) {
+      RoleStatus roleStatus = lookupRoleStatus(instance.roleId);
+      String text;
+      if (null != thrown) {
+        text = SliderUtils.stringify(thrown);
+      } else {
+        text = "container start failure";
+      }
+      instance.diagnostics = text;
+      roleStatus.noteFailed(true, text, ContainerOutcome.Failed);
+      getFailedContainers().put(containerId, instance);
+      roleHistory.onNodeManagerContainerStartFailed(instance.container);
+    }
+  }
+
+  /**
+   * Handle node update from the RM. This syncs up the node map with the RM's view
+   * @param updatedNodes updated nodes
+   */
+  public synchronized NodeUpdatedOutcome onNodesUpdated(List<NodeReport> updatedNodes) {
+    boolean changed = roleHistory.onNodesUpdated(updatedNodes);
+    if (changed) {
+      log.info("YARN cluster changed \u2014cancelling current AA requests");
+      List<AbstractRMOperation> operations = cancelOutstandingAARequests();
+      log.debug("Created {} cancel requests", operations.size());
+      return new NodeUpdatedOutcome(true, operations);
+    }
+    return new NodeUpdatedOutcome(false, new ArrayList<AbstractRMOperation>(0));
+  }
+
+  /**
+   * Return value of the {@link #onNodesUpdated(List)} call.
+   */
+  public static class NodeUpdatedOutcome {
+    public final boolean clusterChanged;
+    public final List<AbstractRMOperation> operations;
+
+    public NodeUpdatedOutcome(boolean clusterChanged,
+        List<AbstractRMOperation> operations) {
+      this.clusterChanged = clusterChanged;
+      this.operations = operations;
+    }
+  }
+  /**
+   * Is a role short lived by the threshold set for this application
+   * @param instance instance
+   * @return true if the instance is considered short lived
+   */
+  @VisibleForTesting
+  public boolean isShortLived(RoleInstance instance) {
+    long time = now();
+    long started = instance.startTime;
+    boolean shortlived;
+    if (started > 0) {
+      long duration = time - started;
+      shortlived = duration < (startTimeThreshold * 1000);
+      log.info("Duration {} and startTimeThreshold {}", duration, startTimeThreshold);
+    } else {
+      // never even saw a start event
+      shortlived = true;
+    }
+    return shortlived;
+  }
+
+  /**
+   * Current time in milliseconds. Made protected for
+   * the option to override it in tests.
+   * @return the current time.
+   */
+  protected long now() {
+    return System.currentTimeMillis();
+  }
+
+  /**
+   * This is a very small class to send a multiple result back from 
+   * the completion operation
+   */
+  public static class NodeCompletionResult {
+    public boolean surplusNode = false;
+    public RoleInstance roleInstance;
+    // did the container fail for *any* reason?
+    public boolean containerFailed = false;
+    // detailed outcome on the container failure
+    public ContainerOutcome outcome = ContainerOutcome.Completed;
+    public int exitStatus = 0;
+    public boolean unknownNode = false;
+
+    public String toString() {
+      final StringBuilder sb =
+        new StringBuilder("NodeCompletionResult{");
+      sb.append("surplusNode=").append(surplusNode);
+      sb.append(", roleInstance=").append(roleInstance);
+      sb.append(", exitStatus=").append(exitStatus);
+      sb.append(", containerFailed=").append(containerFailed);
+      sb.append(", outcome=").append(outcome);
+      sb.append(", unknownNode=").append(unknownNode);
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+
+  /**
+   * handle completed node in the CD -move something from the live
+   * server list to the completed server list.
+   * @param status the node that has just completed
+   * @return NodeCompletionResult
+   */
+  public synchronized NodeCompletionResult onCompletedNode(ContainerStatus status) {
+    ContainerId containerId = status.getContainerId();
+    NodeCompletionResult result = new NodeCompletionResult();
+    RoleInstance roleInstance;
+
+    int exitStatus = status.getExitStatus();
+    result.exitStatus = exitStatus;
+    if (containersBeingReleased.containsKey(containerId)) {
+      log.info("Container was queued for release : {}", containerId);
+      Container container = containersBeingReleased.remove(containerId);
+      RoleStatus roleStatus = lookupRoleStatus(container);
+      long releasing = roleStatus.decReleasing();
+      long actual = roleStatus.decActual();
+      long completedCount = roleStatus.incCompleted();
+      log.info("decrementing role count for role {} to {}; releasing={}, completed={}",
+          roleStatus.getName(),
+          actual,
+          releasing,
+          completedCount);
+      result.outcome = ContainerOutcome.Completed;
+      roleHistory.onReleaseCompleted(container);
+
+    } else if (surplusNodes.remove(containerId)) {
+      //its a surplus one being purged
+      result.surplusNode = true;
+    } else {
+      // a container has failed or been killed
+      // use the exit code to determine the outcome
+      result.containerFailed = true;
+      result.outcome = ContainerOutcome.fromExitStatus(exitStatus);
+
+      roleInstance = removeOwnedContainer(containerId);
+      if (roleInstance != null) {
+        //it was active, move it to failed 
+        incFailedCountainerCount();
+        failedContainers.put(containerId, roleInstance);
+      } else {
+        // the container may have been noted as failed already, so look
+        // it up
+        roleInstance = failedContainers.get(containerId);
+      }
+      if (roleInstance != null) {
+        int roleId = roleInstance.roleId;
+        String rolename = roleInstance.role;
+        log.info("Failed container in role[{}] : {}", roleId, rolename);
+        try {
+          RoleStatus roleStatus = lookupRoleStatus(roleId);
+          roleStatus.decActual();
+          boolean shortLived = isShortLived(roleInstance);
+          String message;
+          Container failedContainer = roleInstance.container;
+
+          //build the failure message
+          if (failedContainer != null) {
+            String completedLogsUrl = getLogsURLForContainer(failedContainer);
+            message = String.format("Failure %s on host %s (%d): %s",
+                roleInstance.getContainerId(),
+                failedContainer.getNodeId().getHost(),
+                exitStatus,
+                completedLogsUrl);
+          } else {
+            message = String.format("Failure %s (%d)", containerId, exitStatus);
+          }
+          roleStatus.noteFailed(shortLived, message, result.outcome);
+          long failed = roleStatus.getFailed();
+          log.info("Current count of failed role[{}] {} =  {}",
+              roleId, rolename, failed);
+          if (failedContainer != null) {
+            roleHistory.onFailedContainer(failedContainer, shortLived, result.outcome);
+          }
+
+        } catch (YarnRuntimeException e1) {
+          log.error("Failed container of unknown role {}", roleId);
+        }
+      } else {
+        //this isn't a known container.
+
+        log.error("Notified of completed container {} that is not in the list" +
+            " of active or failed containers", containerId);
+        completionOfUnknownContainerEvent.incrementAndGet();
+        result.unknownNode = true;
+      }
+    }
+
+    if (result.surplusNode) {
+      //a surplus node
+      return result;
+    }
+
+    //record the complete node's details; this pulls it from the livenode set 
+    //remove the node
+    ContainerId id = status.getContainerId();
+    log.info("Removing node ID {}", id);
+    RoleInstance node = getLiveContainers().remove(id);
+    if (node != null) {
+      node.state = STATE_DESTROYED;
+      node.exitCode = exitStatus;
+      node.diagnostics = status.getDiagnostics();
+      getCompletedContainers().put(id, node);
+      result.roleInstance = node;
+    } else {
+      // not in the list
+      log.warn("Received notification of completion of unknown node {}", id);
+      completionOfNodeNotInLiveListEvent.incrementAndGet();
+    }
+
+    // and the active node list if present
+    removeOwnedContainer(containerId);
+
+    // finally, verify the node doesn't exist any more
+    assert !containersBeingReleased.containsKey(
+        containerId) : "container still in release queue";
+    assert !getLiveContainers().containsKey(
+        containerId) : " container still in live nodes";
+    assert getOwnedContainer(containerId) ==
+           null : "Container still in active container list";
+
+    return result;
+  }
+
+  /**
+   * Get the URL log for a container
+   * @param c container
+   * @return the URL or "" if it cannot be determined
+   */
+  protected String getLogsURLForContainer(Container c) {
+    if (c==null) {
+      return null;
+    }
+    String user = null;
+    try {
+      user = SliderUtils.getCurrentUser().getShortUserName();
+    } catch (IOException ignored) {
+    }
+    String completedLogsUrl = "";
+    String url = logServerURL;
+    if (user != null && SliderUtils.isSet(url)) {
+      completedLogsUrl = url
+          + "/" + c.getNodeId() + "/" + c.getId() + "/ctx/" + user;
+    }
+    return completedLogsUrl;
+  }
+
+  /**
+   * Return the percentage done that Slider is to have YARN display in its
+   * Web UI
+   * @return an number from 0 to 100
+   */
+  public synchronized float getApplicationProgressPercentage() {
+    float percentage;
+    long desired = 0;
+    float actual = 0;
+    for (RoleStatus role : getRoleStatusMap().values()) {
+      desired += role.getDesired();
+      actual += role.getActual();
+    }
+    if (desired == 0) {
+      percentage = 100;
+    } else {
+      percentage = actual / desired;
+    }
+    return percentage;
+  }
+
+  /**
+   * Update the cluster description with the current application state
+   */
+
+  public ClusterDescription refreshClusterStatus() {
+    return refreshClusterStatus(null);
+  }
+
+  /**
+   * Update the cluster description with the current application state
+   * @param providerStatus status from the provider for the cluster info section
+   */
+  public synchronized ClusterDescription refreshClusterStatus(Map<String, String> providerStatus) {
+    ClusterDescription cd = getClusterStatus();
+    long now = now();
+    cd.setInfoTime(StatusKeys.INFO_STATUS_TIME_HUMAN,
+                   StatusKeys.INFO_STATUS_TIME_MILLIS,
+                   now);
+    if (providerStatus != null) {
+      for (Map.Entry<String, String> entry : providerStatus.entrySet()) {
+        cd.setInfo(entry.getKey(), entry.getValue());
+      }
+    }
+    MapOperations infoOps = new MapOperations("info", cd.info);
+    infoOps.mergeWithoutOverwrite(applicationInfo);
+    SliderUtils.addBuildInfo(infoOps, "status");
+    cd.statistics = new HashMap<>();
+
+    // build the map of node -> container IDs
+    Map<String, List<String>> instanceMap = createRoleToInstanceMap();
+    cd.instances = instanceMap;
+    
+    //build the map of node -> containers
+    Map<String, Map<String, ClusterNode>> clusterNodes =
+      createRoleToClusterNodeMap();
+    log.info("app state clusterNodes {} ", clusterNodes.toString());
+    cd.status = new HashMap<>();
+    cd.status.put(ClusterDescriptionKeys.KEY_CLUSTER_LIVE, clusterNodes);
+
+
+    for (RoleStatus role : getRoleStatusMap().values()) {
+      String rolename = role.getName();
+      if (hasUniqueNames(instanceDefinition.getResourceOperations(),
+          role.getGroup())) {
+        cd.setRoleOpt(rolename, COMPONENT_PRIORITY, role.getPriority());
+        cd.setRoleOpt(rolename, ROLE_GROUP, role.getGroup());
+        MapOperations groupOptions = instanceDefinition.getResourceOperations()
+            .getComponent(role.getGroup());
+        SliderUtils.mergeMapsIgnoreDuplicateKeys(cd.getRole(rolename),
+            groupOptions.options);
+      }
+      List<String> instances = instanceMap.get(rolename);
+      int nodeCount = instances != null ? instances.size(): 0;
+      cd.setRoleOpt(rolename, COMPONENT_INSTANCES,
+                    role.getDesired());
+      cd.setRoleOpt(rolename, ROLE_ACTUAL_INSTANCES, nodeCount);
+      cd.setRoleOpt(rolename, ROLE_REQUESTED_INSTANCES, role.getRequested());
+      cd.setRoleOpt(rolename, ROLE_RELEASING_INSTANCES, role.getReleasing());
+      cd.setRoleOpt(rolename, ROLE_FAILED_INSTANCES, role.getFailed());
+      cd.setRoleOpt(rolename, ROLE_FAILED_STARTING_INSTANCES, role.getStartFailed());
+      cd.setRoleOpt(rolename, ROLE_FAILED_RECENTLY_INSTANCES, role.getFailedRecently());
+      cd.setRoleOpt(rolename, ROLE_NODE_FAILED_INSTANCES, role.getNodeFailed());
+      cd.setRoleOpt(rolename, ROLE_PREEMPTED_INSTANCES, role.getPreempted());
+      if (role.isAntiAffinePlacement()) {
+        cd.setRoleOpt(rolename, ROLE_PENDING_AA_INSTANCES, role.getPendingAntiAffineRequests());
+      }
+      Map<String, Integer> stats = role.buildStatistics();
+      cd.statistics.put(rolename, stats);
+    }
+
+    Map<String, Integer> sliderstats = getLiveStatistics();
+    cd.statistics.put(SliderKeys.COMPONENT_AM, sliderstats);
+
+    // liveness
+    cd.liveness = getApplicationLivenessInformation();
+
+    return cd;
+  }
+
+  /**
+   * get application liveness information
+   * @return a snapshot of the current liveness information
+   */  
+  public ApplicationLivenessInformation getApplicationLivenessInformation() {
+    ApplicationLivenessInformation li = new ApplicationLivenessInformation();
+    RoleStatistics stats = getRoleStatistics();
+    int outstanding = (int)(stats.desired - stats.actual);
+    li.requestsOutstanding = outstanding;
+    li.allRequestsSatisfied = outstanding <= 0;
+    li.activeRequests = (int)stats.requested;
+    return li;
+  }
+
+  /**
+   * Get the live statistics map
+   * @return a map of statistics values, defined in the {@link StatusKeys}
+   * keylist.
+   */
+  protected Map<String, Integer> getLiveStatistics() {
+    Map<String, Integer> sliderstats = new HashMap<>();
+    sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_LIVE,
+        liveNodes.size());
+    sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_COMPLETED,
+        completedContainerCount.intValue());
+    sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_FAILED,
+        failedContainerCount.intValue());
+    sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_STARTED,
+        startedContainers.intValue());
+    sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_START_FAILED,
+         startFailedContainerCount.intValue());
+    sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_SURPLUS,
+        surplusContainers.intValue());
+    sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_UNKNOWN_COMPLETED,
+        completionOfUnknownContainerEvent.get());
+    return sliderstats;
+  }
+
+  /**
+   * Get the aggregate statistics across all roles
+   * @return role statistics
+   */
+  public RoleStatistics getRoleStatistics() {
+    RoleStatistics stats = new RoleStatistics();
+    for (RoleStatus role : getRoleStatusMap().values()) {
+      stats.add(role.getStatistics());
+    }
+    return stats;
+  }
+
+  /**
+   * Get a snapshot of component information.
+   * <p>
+   *   This does <i>not</i> include any container list, which 
+   *   is more expensive to create.
+   * @return a map of current role status values.
+   */
+  public Map<String, ComponentInformation> getComponentInfoSnapshot() {
+
+    Map<Integer, RoleStatus> statusMap = getRoleStatusMap();
+    Map<String, ComponentInformation> results = new HashMap<>(
+            statusMap.size());
+
+    for (RoleStatus status : statusMap.values()) {
+      String name = status.getName();
+      ComponentInformation info = status.serialize();
+      results.put(name, info);
+    }
+    return results;
+  }
+
+  /**
+   * Look at where the current node state is -and whether it should be changed
+   */
+  public synchronized List<AbstractRMOperation> reviewRequestAndReleaseNodes()
+      throws SliderInternalStateException, TriggerClusterTeardownException {
+    log.debug("in reviewRequestAndReleaseNodes()");
+    List<AbstractRMOperation> allOperations = new ArrayList<>();
+    for (RoleStatus roleStatus : getRoleStatusMap().values()) {
+      if (!roleStatus.isExcludeFromFlexing()) {
+        List<AbstractRMOperation> operations = reviewOneRole(roleStatus);
+        allOperations.addAll(operations);
+      }
+    }
+    return allOperations;
+  }
+
+  /**
+   * Check the "recent" failure threshold for a role
+   * @param role role to examine
+   * @throws TriggerClusterTeardownException if the role
+   * has failed too many times
+   */
+  private void checkFailureThreshold(RoleStatus role)
+      throws TriggerClusterTeardownException {
+    long failures = role.getFailedRecently();
+    int threshold = getFailureThresholdForRole(role);
+    if (log.isDebugEnabled() && failures > 0) {
+      log.debug("Failure count of component: {}: {}, threshold={}",
+          role.getName(), failures, threshold);
+    }
+
+    if (failures > threshold) {
+      throw new TriggerClusterTeardownException(
+        SliderExitCodes.EXIT_DEPLOYMENT_FAILED,
+          FinalApplicationStatus.FAILED, ErrorStrings.E_UNSTABLE_CLUSTER +
+        " - failed with component %s failed 'recently' %d times (%d in startup);" +
+        " threshold is %d - last failure: %s",
+          role.getName(),
+        role.getFailed(),
+        role.getStartFailed(),
+          threshold,
+        role.getFailureMessage());
+    }
+  }
+
+  /**
+   * Get the failure threshold for a specific role, falling back to
+   * the global one if not
+   * @param roleStatus role
+   * @return the threshold for failures
+   */
+  private int getFailureThresholdForRole(RoleStatus roleStatus) {
+    ConfTreeOperations resources =
+        instanceDefinition.getResourceOperations();
+    return resources.getComponentOptInt(roleStatus.getGroup(),
+        CONTAINER_FAILURE_THRESHOLD,
+        failureThreshold);
+  }
+
+  /**
+   * Get the node failure threshold for a specific role, falling back to
+   * the global one if not
+   * @param roleGroup role group
+   * @return the threshold for failures
+   */
+  private int getNodeFailureThresholdForRole(String roleGroup) {
+    ConfTreeOperations resources =
+        instanceDefinition.getResourceOperations();
+    return resources.getComponentOptInt(roleGroup,
+                                        NODE_FAILURE_THRESHOLD,
+                                        nodeFailureThreshold);
+  }
+
+  /**
+   * Reset the "recent" failure counts of all roles
+   */
+  public void resetFailureCounts() {
+    for (RoleStatus roleStatus : getRoleStatusMap().values()) {
+      long failed = roleStatus.resetFailedRecently();
+      log.info("Resetting failure count of {}; was {}",
+               roleStatus.getName(),
+          failed);
+    }
+    roleHistory.resetFailedRecently();
+  }
+
+  /**
+   * Escalate operation as triggered by external timer.
+   * @return a (usually empty) list of cancel/request operations.
+   */
+  public List<AbstractRMOperation> escalateOutstandingRequests() {
+    return roleHistory.escalateOutstandingRequests();
+  }
+
+  /**
+   * Cancel any outstanding AA Requests, building up the list of ops to
+   * cancel, removing them from RoleHistory structures and the RoleStatus
+   * entries.
+   * @return a (usually empty) list of cancel/request operations.
+   */
+  public synchronized List<AbstractRMOperation> cancelOutstandingAARequests() {
+    // get the list of cancel operations
+    List<AbstractRMOperation> operations = roleHistory.cancelOutstandingAARequests();
+    for (RoleStatus roleStatus : roleStatusMap.values()) {
+      if (roleStatus.isAARequestOutstanding()) {
+        log.info("Cancelling outstanding AA request for {}", roleStatus);
+        roleStatus.cancelOutstandingAARequest();
+      }
+    }
+    return operations;
+  }
+
+  /**
+   * Look at the allocation status of one role, and trigger add/release
+   * actions if the number of desired role instances doesn't equal 
+   * (actual + pending).
+   * <p>
+   * MUST be executed from within a synchronized method
+   * <p>
+   * @param role role
+   * @return a list of operations
+   * @throws SliderInternalStateException if the operation reveals that
+   * the internal state of the application is inconsistent.
+   */
+  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+  private List<AbstractRMOperation> reviewOneRole(RoleStatus role)
+      throws SliderInternalStateException, TriggerClusterTeardownException {
+    List<AbstractRMOperation> operations = new ArrayList<>();
+    long delta;
+    long expected;
+    String name = role.getName();
+    synchronized (role) {
+      delta = role.getDelta();
+      expected = role.getDesired();
+    }
+
+    log.info("Reviewing {} : ", role);
+    log.debug("Expected {}, Delta: {}", expected, delta);
+    checkFailureThreshold(role);
+
+    if (expected < 0 ) {
+      // negative value: fail
+      throw new TriggerClusterTeardownException(
+          SliderExitCodes.EXIT_DEPLOYMENT_FAILED,
+          FinalApplicationStatus.FAILED,
+          "Negative component count of %d desired for component %s",
+          expected, role);
+    }
+
+    if (delta > 0) {
+      // more workers needed than we have -ask for more
+      log.info("{}: Asking for {} more nodes(s) for a total of {} ", name, delta, expected);
+
+      if (role.isAntiAffinePlacement()) {
+        long pending = delta;
+        if (roleHistory.canPlaceAANodes()) {
+          // build one only if there is none outstanding, the role history knows
+          // enough about the cluster to ask, and there is somewhere to place
+          // the node
+          if (!role.isAARequestOutstanding()) {
+            // no outstanding AA; try to place things
+            AMRMClient.ContainerRequest request = createAAContainerRequest(role);
+            if (request != null) {
+              pending--;
+              log.info("Starting an anti-affine request sequence for {} nodes; pending={}",
+                delta, pending);
+              addContainerRequest(operations, request);
+            } else {
+              log.info("No location for anti-affine request");
+            }
+          }
+        } else {
+          log.warn("Awaiting node map before generating anti-affinity requests");
+        }
+        log.info("Setting pending to {}", pending);
+        role.setPendingAntiAffineRequests(pending);
+      } else {
+
+        for (int i = 0; i < delta; i++) {
+          //get the role history to select a suitable node, if available
+          addContainerRequest(operations, createContainerRequest(role));
+        }
+      }
+    } else if (delta < 0) {
+      log.info("{}: Asking for {} fewer node(s) for a total of {}", name,
+               -delta,
+               expected);
+      // reduce the number expected (i.e. subtract the delta)
+      long excess = -delta;
+
+      // how many requests are outstanding? for AA roles, this includes pending
+      long outstandingRequests = role.getRequested() + role.getPendingAntiAffineRequests();
+      if (outstandingRequests > 0) {
+        // outstanding requests.
+        int toCancel = (int)Math.min(outstandingRequests, excess);
+
+        // Delegate to Role History
+        List<AbstractRMOperation> cancellations = roleHistory.cancelRequestsForRole(role, toCancel);
+        log.info("Found {} outstanding requests to cancel", cancellations.size());
+        operations.addAll(cancellations);
+        if (toCancel != cancellations.size()) {
+          log.error("Tracking of outstanding requests is not in sync with the summary statistics:" +
+              " expected to be able to cancel {} requests, but got {}",
+              toCancel, cancellations.size());
+        }
+
+        role.cancel(toCancel);
+        excess -= toCancel;
+        assert excess >= 0 : "Attempted to cancel too many requests";
+        log.info("Submitted {} cancellations, leaving {} to release",
+            toCancel, excess);
+        if (excess == 0) {
+          log.info("After cancelling requests, application is now at desired size");
+        }
+      }
+
+      // after the cancellation there may be no excess
+      if (excess > 0) {
+
+        // there's an excess, so more to cancel
+        // get the nodes to release
+        int roleId = role.getKey();
+
+        // enum all active nodes that aren't being released
+        List<RoleInstance> containersToRelease = enumNodesWithRoleId(roleId, true);
+        if (containersToRelease.isEmpty()) {
+          log.info("No containers for component {}", roleId);
+        }
+
+        // filter out all release-in-progress nodes
+        ListIterator<RoleInstance> li = containersToRelease.listIterator();
+        while (li.hasNext()) {
+          RoleInstance next = li.next();
+          if (next.released) {
+            li.remove();
+          }
+        }
+
+        // warn if the desired state can't be reached
+        int numberAvailableForRelease = containersToRelease.size();
+        if (numberAvailableForRelease < excess) {
+          log.warn("Not enough containers to release, have {} and need {} more",
+              numberAvailableForRelease,
+              excess - numberAvailableForRelease);
+        }
+
+        // ask the release selector to sort the targets
+        containersToRelease =  containerReleaseSelector.sortCandidates(
+            roleId,
+            containersToRelease);
+
+        // crop to the excess
+        List<RoleInstance> finalCandidates = (excess < numberAvailableForRelease)
+            ? containersToRelease.subList(0, (int)excess)
+            : containersToRelease;
+
+        // then build up a release operation, logging each container as released
+        for (RoleInstance possible : finalCandidates) {
+          log.info("Targeting for release: {}", possible);
+          containerReleaseSubmitted(possible.container);
+          operations.add(new ContainerReleaseOperation(possible.getId()));
+        }
+      }
+
+    } else {
+      // actual + requested == desired
+      // there's a special case here: clear all pending AA requests
+      if (role.getPendingAntiAffineRequests() > 0) {
+        log.debug("Clearing outstanding pending AA requests");
+        role.setPendingAntiAffineRequests(0);
+      }
+    }
+
+    // there's now a list of operations to execute
+    log.debug("operations scheduled: {}; updated role: {}", operations.size(), role);
+    return operations;
+  }
+
+  /**
+   * Add a container request if the request is non-null
+   * @param operations operations to add the entry to
+   * @param containerAsk what to ask for
+   * @return true if a request was added
+   */
+  private boolean addContainerRequest(List<AbstractRMOperation> operations,
+      AMRMClient.ContainerRequest containerAsk) {
+    if (containerAsk != null) {
+      log.info("Container ask is {} and label = {}", containerAsk,
+          containerAsk.getNodeLabelExpression());
+      int askMemory = containerAsk.getCapability().getMemory();
+      if (askMemory > this.containerMaxMemory) {
+        log.warn("Memory requested: {} > max of {}", askMemory, containerMaxMemory);
+      }
+      operations.add(new ContainerRequestOperation(containerAsk));
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Releases a container based on container id
+   * @param containerId
+   * @return
+   * @throws SliderInternalStateException
+   */
+  public List<AbstractRMOperation> releaseContainer(ContainerId containerId)
+      throws SliderInternalStateException {
+    List<AbstractRMOperation> operations = new ArrayList<AbstractRMOperation>();
+    List<RoleInstance> activeRoleInstances = cloneOwnedContainerList();
+    for (RoleInstance role : activeRoleInstances) {
+      if (role.container.getId().equals(containerId)) {
+        containerReleaseSubmitted(role.container);
+        operations.add(new ContainerReleaseOperation(role.getId()));
+      }
+    }
+
+    return operations;
+  }
+
+  /**
+   * Find a container running on a specific host -looking
+   * into the node ID to determine this.
+   *
+   * @param node node
+   * @param roleId role the container must be in
+   * @return a container or null if there are no containers on this host
+   * that can be released.
+   */
+  private RoleInstance findRoleInstanceOnHost(NodeInstance node, int roleId) {
+    Collection<RoleInstance> targets = cloneOwnedContainerList();
+    String hostname = node.hostname;
+    for (RoleInstance ri : targets) {
+      if (hostname.equals(RoleHistoryUtils.hostnameOf(ri.container))
+                         && ri.roleId == roleId
+        && containersBeingReleased.get(ri.getContainerId()) == null) {
+        return ri;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Release all containers.
+   * @return a list of operations to execute
+   */
+  public synchronized List<AbstractRMOperation> releaseAllContainers() {
+
+    Collection<RoleInstance> targets = cloneOwnedContainerList();
+    log.info("Releasing {} containers", targets.size());
+    List<AbstractRMOperation> operations =
+      new ArrayList<>(targets.size());
+    for (RoleInstance instance : targets) {
+      if (instance.roleId == SliderKeys.ROLE_AM_PRIORITY_INDEX) {
+        // don't worry about the AM
+        continue;
+      }
+      Container possible = instance.container;
+      ContainerId id = possible.getId();
+      if (!instance.released) {
+        String url = getLogsURLForContainer(possible);
+        log.info("Releasing container. Log: " + url);
+        try {
+          containerReleaseSubmitted(possible);
+        } catch (SliderInternalStateException e) {
+          log.warn("when releasing container {} :", possible, e);
+        }
+        operations.add(new ContainerReleaseOperation(id));
+      }
+    }
+    return operations;
+  }
+
+  /**
+   * Event handler for allocated containers: builds up the lists
+   * of assignment actions (what to run where), and possibly
+   * a list of operations to perform
+   * @param allocatedContainers the containers allocated
+   * @param assignments the assignments of roles to containers
+   * @param operations any allocation or release operations
+   */
+  public synchronized void onContainersAllocated(List<Container> allocatedContainers,
+                                    List<ContainerAssignment> assignments,
+                                    List<AbstractRMOperation> operations) {
+    assignments.clear();
+    operations.clear();
+    List<Container> ordered = roleHistory.prepareAllocationList(allocatedContainers);
+    log.debug("onContainersAllocated(): Total containers allocated = {}", ordered.size());
+    for (Container container : ordered) {
+      final NodeId nodeId = container.getNodeId();
+      String containerHostInfo = nodeId.getHost() + ":" + nodeId.getPort();
+      //get the role
+      final ContainerId cid = container.getId();
+      final RoleStatus role = lookupRoleStatus(container);
+
+      //dec requested count
+      role.decRequested();
+
+      //inc allocated count -this may need to be dropped in a moment,
+      // but us needed to update the logic below
+      final long allocated = role.incActual();
+      final long desired = role.getDesired();
+
+      final String roleName = role.getName();
+      final ContainerAllocationResults allocation =
+          roleHistory.onContainerAllocated(container, desired, allocated);
+      final ContainerAllocationOutcome outcome = allocation.outcome;
+
+      // add all requests to the operations list
+      operations.addAll(allocation.operations);
+
+      //look for condition where we get more back than we asked
+      if (allocated > desired) {
+        log.info("Discarding surplus {} container {} on {}", roleName,  cid, containerHostInfo);
+        operations.add(new ContainerReleaseOperation(cid));
+        //register as a surplus node
+        surplusNodes.add(cid);
+        surplusContainers.inc();
+        //and, as we aren't binding it to role, dec that role's actual count
+        role.decActual();
+      } else {
+
+        // Allocation being accepted -so decrement the number of outstanding requests
+        decOutstandingContainerRequests();
+
+        log.info("Assigning role {} to container" +
+                 " {}," +
+                 " on {}:{},",
+                 roleName,
+                 cid,
+                 nodeId.getHost(),
+                 nodeId.getPort());
+
+        assignments.add(new ContainerAssignment(container, role, outcome));
+        //add to the history
+        roleHistory.onContainerAssigned(container);
+        // now for AA requests, add some more
+        if (role.isAntiAffinePlacement()) {
+          role.completeOutstandingAARequest();
+          // check invariants. The new node must become unavailable.
+          NodeInstance node = roleHistory.getOrCreateNodeInstance(container);
+          if (node.canHost(role.getKey(), role.getLabelExpression())) {
+            log.error("Assigned node still declares as available {}", node.toFullString() );
+          }
+          if (role.getPendingAntiAffineRequests() > 0) {
+            // still an outstanding AA request: need to issue a new one.
+            log.info("Asking for next container for AA role {}", roleName);
+            if (!addContainerRequest(operations, createAAContainerRequest(role))) {
+              log.info("No capacity in cluster for new requests");
+            } else {
+              role.decPendingAntiAffineRequests();
+            }
+            log.debug("Current AA role status {}", role);
+          } else {
+            log.info("AA request sequence completed for role {}", role);
+          }
+        }
+
+      }
+    }
+  }
+
+  /**
+   * Get diagnostics info about containers
+   */
+  public String getContainerDiagnosticInfo() {
+    StringBuilder builder = new StringBuilder();
+    for (RoleStatus roleStatus : getRoleStatusMap().values()) {
+      builder.append(roleStatus).append('\n');
+    }
+    return builder.toString();
+  }
+
+  /**
+   * Event handler for the list of active containers on restart.
+   * Sets the info key {@link StatusKeys#INFO_CONTAINERS_AM_RESTART}
+   * to the size of the list passed down (and does not set it if none were)
+   * @param liveContainers the containers allocated
+   * @return true if a rebuild took place (even if size 0)
+   * @throws RuntimeException on problems
+   */
+  private boolean rebuildModelFromRestart(List<Container> liveContainers)
+      throws BadClusterStateException {
+    if (liveContainers == null) {
+      return false;
+    }
+    for (Container container : liveContainers) {
+      addRestartedContainer(container);
+    }
+    clusterStatus.setInfo(StatusKeys.INFO_CONTAINERS_AM_RESTART,
+                               Integer.toString(liveContainers.size()));
+    return true;
+  }
+
+  /**
+   * Add a restarted container by walking it through the create/submit/start
+   * lifecycle, so building up the internal structures
+   * @param container container that was running before the AM restarted
+   * @throws RuntimeException on problems
+   */
+  private void addRestartedContainer(Container container)
+      throws BadClusterStateException {
+    String containerHostInfo = container.getNodeId().getHost()
+                               + ":" +
+                               container.getNodeId().getPort();
+    // get the container ID
+    ContainerId cid = container.getId();
+    
+    // get the role
+    int roleId = ContainerPriority.extractRole(container);
+    RoleStatus role =
+      lookupRoleStatus(roleId);
+    // increment

<TRUNCATED>

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org