You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/12/13 22:53:03 UTC
[11/74] [abbrv] hadoop git commit: YARN-5461. Initial code ported
from slider-core module. (jianhe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
new file mode 100644
index 0000000..16c2435
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -0,0 +1,2489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.api.ClusterDescriptionKeys;
+import org.apache.slider.api.ClusterDescriptionOperations;
+import org.apache.slider.api.ClusterNode;
+import org.apache.slider.api.InternalKeys;
+import org.apache.slider.api.ResourceKeys;
+import org.apache.slider.api.StatusKeys;
+import org.apache.slider.api.types.ApplicationLivenessInformation;
+import org.apache.slider.api.types.ComponentInformation;
+import org.apache.slider.api.types.RoleStatistics;
+import org.apache.slider.common.SliderExitCodes;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.tools.ConfigHelper;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTree;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.conf.MapOperations;
+import org.apache.slider.core.exceptions.BadClusterStateException;
+import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.ErrorStrings;
+import org.apache.slider.core.exceptions.NoSuchNodeException;
+import org.apache.slider.core.exceptions.SliderInternalStateException;
+import org.apache.slider.core.exceptions.TriggerClusterTeardownException;
+import org.apache.slider.core.persist.AggregateConfSerDeser;
+import org.apache.slider.core.persist.ConfTreeSerDeser;
+import org.apache.slider.providers.PlacementPolicy;
+import org.apache.slider.providers.ProviderRole;
+import org.apache.slider.server.appmaster.management.LongGauge;
+import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
+import org.apache.slider.server.appmaster.management.MetricsConstants;
+import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
+import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation;
+import org.apache.slider.server.appmaster.operations.ContainerRequestOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.slider.api.ResourceKeys.*;
+import static org.apache.slider.api.RoleKeys.*;
+import static org.apache.slider.api.StateValues.*;
+
+/**
+ * The model of all the ongoing state of a Slider AM.
+ *
+ * concurrency rules: any method which begins with <i>build</i>
+ * is not synchronized and intended to be used during
+ * initialization.
+ */
+public class AppState {
+ protected static final Logger log =
+ LoggerFactory.getLogger(AppState.class);
+
+ private final AbstractClusterServices recordFactory;
+
+ private final MetricsAndMonitoring metricsAndMonitoring;
+
+ /**
+ * Flag set to indicate the application is live -this only happens
+ * after the buildInstance operation
+ */
+ private boolean applicationLive = false;
+
+ /**
+ * The definition of the instance. Flexing updates the resources section
+ * This is used as a synchronization point on activities that update
+ * the CD, and also to update some of the structures that
+ * feed in to the CD
+ */
+ private AggregateConf instanceDefinition;
+
+ /**
+ * Time the instance definition snapshots were created
+ */
+ private long snapshotTime;
+
+ /**
+ * Snapshot of the instance definition. This is fully
+ * resolved.
+ */
+ private AggregateConf instanceDefinitionSnapshot;
+
+ /**
+ * Snapshot of the raw instance definition; unresolved and
+ * without any patch of an AM into it.
+ */
+ private AggregateConf unresolvedInstanceDefinition;
+
+ /**
+ * snapshot of resources as of last update time
+ */
+ private ConfTreeOperations resourcesSnapshot;
+ private ConfTreeOperations appConfSnapshot;
+ private ConfTreeOperations internalsSnapshot;
+
+ /**
+ * This is the status, the live model
+ */
+ private ClusterDescription clusterStatus = new ClusterDescription();
+
+ /**
+ * Metadata provided by the AM for use in filling in status requests
+ */
+ private Map<String, String> applicationInfo;
+
+ /**
+ * Client properties created via the provider -static for the life
+ * of the application
+ */
+ private Map<String, String> clientProperties = new HashMap<>();
+
+ /**
+ * This is a template of the cluster status
+ */
+ private ClusterDescription clusterStatusTemplate = new ClusterDescription();
+
+ private final Map<Integer, RoleStatus> roleStatusMap =
+ new ConcurrentSkipListMap<>();
+
+ private final Map<String, ProviderRole> roles =
+ new ConcurrentHashMap<>();
+
+ private final ConcurrentSkipListMap<Integer, ProviderRole> rolePriorityMap =
+ new ConcurrentSkipListMap<>();
+
+ /**
+ * The master node.
+ */
+ private RoleInstance appMasterNode;
+
+ /**
+ * Hash map of the containers we have. This includes things that have
+ * been allocated but are not live; it is a superset of the live list
+ */
+ private final ConcurrentMap<ContainerId, RoleInstance> ownedContainers =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Hash map of the containers we have released, but we
+ * are still awaiting acknowledgements on. Any failure of these
+ * containers is treated as a successful outcome
+ */
+ private final ConcurrentMap<ContainerId, Container> containersBeingReleased =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Counter for completed containers ( complete denotes successful or failed )
+ */
+ private final LongGauge completedContainerCount = new LongGauge();
+
+ /**
+ * Count of failed containers
+ */
+ private final LongGauge failedContainerCount = new LongGauge();
+
+ /**
+ * # of started containers
+ */
+ private final LongGauge startedContainers = new LongGauge();
+
+ /**
+ * # of containers that failed to start
+ */
+ private final LongGauge startFailedContainerCount = new LongGauge();
+
+ /**
+ * Track the number of surplus containers received and discarded
+ */
+ private final LongGauge surplusContainers = new LongGauge();
+
+ /**
+ * Track the number of requested containers.
+ * Important: this does not include AA requests which are yet to be issued.
+ */
+ private final LongGauge outstandingContainerRequests = new LongGauge();
+
+ /**
+ * Map of requested nodes. This records the command used to start it,
+ * resources, etc. When container started callback is received,
+ * the node is promoted from here to the containerMap
+ */
+ private final Map<ContainerId, RoleInstance> startingContainers =
+ new ConcurrentHashMap<>();
+
+ /**
+ * List of completed nodes. This isn't kept in the CD as it gets too
+ * big for the RPC responses. Indeed, we should think about how deep to get this
+ */
+ private final Map<ContainerId, RoleInstance> completedContainers
+ = new ConcurrentHashMap<>();
+
+ /**
+ * Nodes that failed to start.
+ * Again, kept out of the CD
+ */
+ private final Map<ContainerId, RoleInstance> failedContainers =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Nodes that came assigned to a role above that
+ * which were asked for -this appears to happen
+ */
+ private final Set<ContainerId> surplusNodes = new HashSet<>();
+
+ /**
+ * Map of containerID to cluster nodes, for status reports.
+ * Access to this should be synchronized on the clusterDescription
+ */
+ private final Map<ContainerId, RoleInstance> liveNodes =
+ new ConcurrentHashMap<>();
+ private final AtomicInteger completionOfNodeNotInLiveListEvent =
+ new AtomicInteger();
+ private final AtomicInteger completionOfUnknownContainerEvent =
+ new AtomicInteger();
+
+
+ /**
+ * limits of container core numbers in this queue
+ */
+ private int containerMaxCores;
+ private int containerMinCores;
+
+ /**
+ * limits of container memory in this queue
+ */
+ private int containerMaxMemory;
+ private int containerMinMemory;
+
+ private RoleHistory roleHistory;
+ private Configuration publishedProviderConf;
+ private long startTimeThreshold;
+
+ private int failureThreshold = 10;
+ private int nodeFailureThreshold = 3;
+
+ private String logServerURL = "";
+
+ /**
+ * Selector of containers to release; application wide.
+ */
+ private ContainerReleaseSelector containerReleaseSelector;
+ private Resource minResource;
+ private Resource maxResource;
+
+ /**
+ * Create an instance
+ * @param recordFactory factory for YARN records
+ * @param metricsAndMonitoring metrics and monitoring services
+ */
+ public AppState(AbstractClusterServices recordFactory,
+ MetricsAndMonitoring metricsAndMonitoring) {
+ Preconditions.checkArgument(recordFactory != null, "null recordFactory");
+ Preconditions.checkArgument(metricsAndMonitoring != null, "null metricsAndMonitoring");
+ this.recordFactory = recordFactory;
+ this.metricsAndMonitoring = metricsAndMonitoring;
+
+ // register any metrics
+ register(MetricsConstants.CONTAINERS_OUTSTANDING_REQUESTS, outstandingContainerRequests);
+ register(MetricsConstants.CONTAINERS_SURPLUS, surplusContainers);
+ register(MetricsConstants.CONTAINERS_STARTED, startedContainers);
+ register(MetricsConstants.CONTAINERS_COMPLETED, completedContainerCount);
+ register(MetricsConstants.CONTAINERS_FAILED, failedContainerCount);
+ register(MetricsConstants.CONTAINERS_START_FAILED, startFailedContainerCount);
+ }
+
+ private void register(String name, Metric counter) {
+ this.metricsAndMonitoring.getMetrics().register(
+ MetricRegistry.name(AppState.class, name), counter);
+ }
+
+ public long getFailedCountainerCount() {
+ return failedContainerCount.getCount();
+ }
+
+ /**
+ * Increment the count
+ */
+ public void incFailedCountainerCount() {
+ failedContainerCount.inc();
+ }
+
+ public long getStartFailedCountainerCount() {
+ return startFailedContainerCount.getCount();
+ }
+
+ /**
+ * Increment the count and return the new value
+ */
+ public void incStartedCountainerCount() {
+ startedContainers.inc();
+ }
+
+ public long getStartedCountainerCount() {
+ return startedContainers.getCount();
+ }
+
+ /**
+ * Increment the count and return the new value
+ */
+ public void incStartFailedCountainerCount() {
+ startFailedContainerCount.inc();
+ }
+
+ public AtomicInteger getCompletionOfNodeNotInLiveListEvent() {
+ return completionOfNodeNotInLiveListEvent;
+ }
+
+ public AtomicInteger getCompletionOfUnknownContainerEvent() {
+ return completionOfUnknownContainerEvent;
+ }
+
+
+ public Map<Integer, RoleStatus> getRoleStatusMap() {
+ return roleStatusMap;
+ }
+
+ protected Map<String, ProviderRole> getRoleMap() {
+ return roles;
+ }
+
+ public Map<Integer, ProviderRole> getRolePriorityMap() {
+ return rolePriorityMap;
+ }
+
+ private Map<ContainerId, RoleInstance> getStartingContainers() {
+ return startingContainers;
+ }
+
+ private Map<ContainerId, RoleInstance> getCompletedContainers() {
+ return completedContainers;
+ }
+
+ public Map<ContainerId, RoleInstance> getFailedContainers() {
+ return failedContainers;
+ }
+
+ public Map<ContainerId, RoleInstance> getLiveContainers() {
+ return liveNodes;
+ }
+
+ /**
+ * Get the current view of the cluster status.
+ * <p>
+ * Calls to {@link #refreshClusterStatus()} trigger a
+ * refresh of this field.
+ * <p>
+ * This is read-only
+ * to the extent that changes here do not trigger updates in the
+ * application state.
+ * @return the cluster status
+ */
+ public synchronized ClusterDescription getClusterStatus() {
+ return clusterStatus;
+ }
+
+ @VisibleForTesting
+ protected synchronized void setClusterStatus(ClusterDescription clusterDesc) {
+ this.clusterStatus = clusterDesc;
+ }
+
+ /**
+ * Set the instance definition -this also builds the (now obsolete)
+ * cluster specification from it.
+ *
+ * Important: this is for early binding and must not be used after the build
+ * operation is complete.
+ * @param definition initial definition
+ * @throws BadConfigException
+ */
+ public synchronized void setInitialInstanceDefinition(AggregateConf definition)
+ throws BadConfigException, IOException {
+ log.debug("Setting initial instance definition");
+ // snapshot the definition
+ AggregateConfSerDeser serDeser = new AggregateConfSerDeser();
+
+ unresolvedInstanceDefinition = serDeser.fromInstance(definition);
+
+ this.instanceDefinition = serDeser.fromInstance(definition);
+ onInstanceDefinitionUpdated();
+ }
+
+ public synchronized AggregateConf getInstanceDefinition() {
+ return instanceDefinition;
+ }
+
+ /**
+ * Get the role history of the application
+ * @return the role history
+ */
+ @VisibleForTesting
+ public RoleHistory getRoleHistory() {
+ return roleHistory;
+ }
+
+ /**
+ * Get the path used for history files
+ * @return the directory used for history files
+ */
+ @VisibleForTesting
+ public Path getHistoryPath() {
+ return roleHistory.getHistoryPath();
+ }
+
+ /**
+ * Set the container limits -the min and max values for
+ * resource requests. All requests must be multiples of the min
+ * values.
+ * @param minMemory min memory MB
+ * @param maxMemory maximum memory
+ * @param minCores min v core count
+ * @param maxCores maximum cores
+ */
+ public void setContainerLimits(int minMemory,int maxMemory, int minCores, int maxCores) {
+ containerMinCores = minCores;
+ containerMaxCores = maxCores;
+ containerMinMemory = minMemory;
+ containerMaxMemory = maxMemory;
+ minResource = recordFactory.newResource(containerMinMemory, containerMinCores);
+ maxResource = recordFactory.newResource(containerMaxMemory, containerMaxCores);
+ }
+
+ public ConfTreeOperations getResourcesSnapshot() {
+ return resourcesSnapshot;
+ }
+
+ public ConfTreeOperations getAppConfSnapshot() {
+ return appConfSnapshot;
+ }
+
+ public ConfTreeOperations getInternalsSnapshot() {
+ return internalsSnapshot;
+ }
+
+ public boolean isApplicationLive() {
+ return applicationLive;
+ }
+
+ public long getSnapshotTime() {
+ return snapshotTime;
+ }
+
+ public synchronized AggregateConf getInstanceDefinitionSnapshot() {
+ return instanceDefinitionSnapshot;
+ }
+
+ public AggregateConf getUnresolvedInstanceDefinition() {
+ return unresolvedInstanceDefinition;
+ }
+
+ public synchronized void buildInstance(AppStateBindingInfo binding)
+ throws BadClusterStateException, BadConfigException, IOException {
+ binding.validate();
+
+ log.debug("Building application state");
+ publishedProviderConf = binding.publishedProviderConf;
+ applicationInfo = binding.applicationInfo != null ? binding.applicationInfo
+ : new HashMap<String, String>();
+
+ clientProperties = new HashMap<>();
+ containerReleaseSelector = binding.releaseSelector;
+
+
+ Set<String> confKeys = ConfigHelper.sortedConfigKeys(publishedProviderConf);
+
+ // Add the -site configuration properties
+ for (String key : confKeys) {
+ String val = publishedProviderConf.get(key);
+ clientProperties.put(key, val);
+ }
+
+ // set the cluster specification (once its dependency the client properties
+ // is out the way
+ setInitialInstanceDefinition(binding.instanceDefinition);
+
+ //build the initial role list
+ List<ProviderRole> roleList = new ArrayList<>(binding.roles);
+ for (ProviderRole providerRole : roleList) {
+ buildRole(providerRole);
+ }
+
+ ConfTreeOperations resources = instanceDefinition.getResourceOperations();
+
+ Set<String> roleNames = resources.getComponentNames();
+ for (String name : roleNames) {
+ if (roles.containsKey(name)) {
+ continue;
+ }
+ if (hasUniqueNames(resources, name)) {
+ log.info("Skipping group {}", name);
+ continue;
+ }
+ // this is a new value
+ log.info("Adding role {}", name);
+ MapOperations resComponent = resources.getComponent(name);
+ ProviderRole dynamicRole = createDynamicProviderRole(name, resComponent);
+ buildRole(dynamicRole);
+ roleList.add(dynamicRole);
+ }
+ //then pick up the requirements
+ buildRoleRequirementsFromResources();
+
+ //set the livespan
+ MapOperations globalResOpts = instanceDefinition.getResourceOperations().getGlobalOptions();
+
+ startTimeThreshold = globalResOpts.getOptionInt(
+ InternalKeys.INTERNAL_CONTAINER_FAILURE_SHORTLIFE,
+ InternalKeys.DEFAULT_INTERNAL_CONTAINER_FAILURE_SHORTLIFE);
+
+ failureThreshold = globalResOpts.getOptionInt(
+ CONTAINER_FAILURE_THRESHOLD,
+ DEFAULT_CONTAINER_FAILURE_THRESHOLD);
+ nodeFailureThreshold = globalResOpts.getOptionInt(
+ NODE_FAILURE_THRESHOLD,
+ DEFAULT_NODE_FAILURE_THRESHOLD);
+ initClusterStatus();
+
+
+ // set up the role history
+ roleHistory = new RoleHistory(roleStatusMap.values(), recordFactory);
+ roleHistory.register(metricsAndMonitoring);
+ roleHistory.onStart(binding.fs, binding.historyPath);
+ // trigger first node update
+ roleHistory.onNodesUpdated(binding.nodeReports);
+
+
+ //rebuild any live containers
+ rebuildModelFromRestart(binding.liveContainers);
+
+ // any am config options to pick up
+ logServerURL = binding.serviceConfig.get(YarnConfiguration.YARN_LOG_SERVER_URL, "");
+ //mark as live
+ applicationLive = true;
+ }
+
+ public void initClusterStatus() {
+ //copy into cluster status.
+ ClusterDescription status = ClusterDescription.copy(clusterStatusTemplate);
+ status.state = STATE_CREATED;
+ MapOperations infoOps = new MapOperations("info", status.info);
+ infoOps.mergeWithoutOverwrite(applicationInfo);
+ SliderUtils.addBuildInfo(infoOps, "status");
+
+ long now = now();
+ status.setInfoTime(StatusKeys.INFO_LIVE_TIME_HUMAN,
+ StatusKeys.INFO_LIVE_TIME_MILLIS,
+ now);
+ SliderUtils.setInfoTime(infoOps,
+ StatusKeys.INFO_LIVE_TIME_HUMAN,
+ StatusKeys.INFO_LIVE_TIME_MILLIS,
+ now);
+ if (0 == status.createTime) {
+ status.createTime = now;
+ SliderUtils.setInfoTime(infoOps,
+ StatusKeys.INFO_CREATE_TIME_HUMAN,
+ StatusKeys.INFO_CREATE_TIME_MILLIS,
+ now);
+ }
+ status.state = STATE_LIVE;
+
+ //set the app state to this status
+ setClusterStatus(status);
+ }
+
+ /**
+ * Build a dynamic provider role
+ * @param name name of role
+ * @return a new provider role
+ * @throws BadConfigException bad configuration
+ */
+ public ProviderRole createDynamicProviderRole(String name, MapOperations component)
+ throws BadConfigException {
+ return createDynamicProviderRole(name, name, component);
+ }
+
+ /**
+ * Build a dynamic provider role
+ * @param name name of role
+ * @param group group of role
+ * @return a new provider role
+ * @throws BadConfigException bad configuration
+ */
+ public ProviderRole createDynamicProviderRole(String name, String group, MapOperations component)
+ throws BadConfigException {
+ String priOpt = component.getMandatoryOption(COMPONENT_PRIORITY);
+ int priority = SliderUtils.parseAndValidate(
+ "value of " + name + " " + COMPONENT_PRIORITY, priOpt, 0, 1, -1);
+
+ String placementOpt = component.getOption(COMPONENT_PLACEMENT_POLICY,
+ Integer.toString(PlacementPolicy.DEFAULT));
+
+ int placement = SliderUtils.parseAndValidate(
+ "value of " + name + " " + COMPONENT_PLACEMENT_POLICY, placementOpt, 0, 0, -1);
+
+ int placementTimeout = component.getOptionInt(PLACEMENT_ESCALATE_DELAY,
+ DEFAULT_PLACEMENT_ESCALATE_DELAY_SECONDS);
+
+ ProviderRole newRole = new ProviderRole(name,
+ group,
+ priority,
+ placement,
+ getNodeFailureThresholdForRole(group),
+ placementTimeout,
+ component.getOption(YARN_LABEL_EXPRESSION, DEF_YARN_LABEL_EXPRESSION));
+ log.info("New {} ", newRole);
+ return newRole;
+ }
+
+ /**
+ * Actions to perform when an instance definition is updated
+ * Currently:
+ * <ol>
+ * <li>
+ * resolve the configuration
+ * </li>
+ * <li>
+ * update the cluster spec derivative
+ * </li>
+ * </ol>
+ *
+ * @throws BadConfigException
+ */
+ private synchronized void onInstanceDefinitionUpdated()
+ throws BadConfigException, IOException {
+
+ log.debug("Instance definition updated");
+ //note the time
+ snapshotTime = now();
+
+ for (String component : instanceDefinition.getResourceOperations().getComponentNames()) {
+ instanceDefinition.getAppConfOperations().getOrAddComponent(component);
+ }
+
+ // resolve references if not already done
+ instanceDefinition.resolve();
+
+ // force in the AM desired state values
+ ConfTreeOperations resources = instanceDefinition.getResourceOperations();
+
+ if (resources.getComponent(SliderKeys.COMPONENT_AM) != null) {
+ resources.setComponentOpt(
+ SliderKeys.COMPONENT_AM, COMPONENT_INSTANCES, "1");
+ }
+
+
+ //snapshot all three sectons
+ resourcesSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getResources());
+ appConfSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getAppConf());
+ internalsSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getInternal());
+ //build a new aggregate from the snapshots
+ instanceDefinitionSnapshot = new AggregateConf(resourcesSnapshot.confTree,
+ appConfSnapshot.confTree,
+ internalsSnapshot.confTree);
+ instanceDefinitionSnapshot.setName(instanceDefinition.getName());
+
+ clusterStatusTemplate = ClusterDescriptionOperations.buildFromInstanceDefinition(
+ instanceDefinition);
+
+ // Add the -site configuration properties
+ for (Map.Entry<String, String> prop : clientProperties.entrySet()) {
+ clusterStatusTemplate.clientProperties.put(prop.getKey(), prop.getValue());
+ }
+
+ }
+
+ /**
+ * The resource configuration is updated -review and update state.
+ * @param resources updated resources specification
+ * @return a list of any dynamically added provider roles
+ * (purely for testing purposes)
+ */
+ @VisibleForTesting
+ public synchronized List<ProviderRole> updateResourceDefinitions(ConfTree resources)
+ throws BadConfigException, IOException {
+ log.debug("Updating resources to {}", resources);
+ // snapshot the (possibly unresolved) values
+ ConfTreeSerDeser serDeser = new ConfTreeSerDeser();
+ unresolvedInstanceDefinition.setResources(
+ serDeser.fromInstance(resources));
+ // assign another copy under the instance definition for resolving
+ // and then driving application size
+ instanceDefinition.setResources(serDeser.fromInstance(resources));
+ onInstanceDefinitionUpdated();
+
+ // propagate the role table
+ Map<String, Map<String, String>> updated = resources.components;
+ getClusterStatus().roles = SliderUtils.deepClone(updated);
+ getClusterStatus().updateTime = now();
+ return buildRoleRequirementsFromResources();
+ }
+
+ /**
+ * build the role requirements from the cluster specification
+ * @return a list of any dynamically added provider roles
+ */
+ private List<ProviderRole> buildRoleRequirementsFromResources() throws BadConfigException {
+
+ List<ProviderRole> newRoles = new ArrayList<>(0);
+
+ // now update every role's desired count.
+ // if there are no instance values, that role count goes to zero
+
+ ConfTreeOperations resources =
+ instanceDefinition.getResourceOperations();
+
+ // Add all the existing roles
+ Map<String, Integer> groupCounts = new HashMap<>();
+ for (RoleStatus roleStatus : getRoleStatusMap().values()) {
+ if (roleStatus.isExcludeFromFlexing()) {
+ // skip inflexible roles, e.g AM itself
+ continue;
+ }
+ long currentDesired = roleStatus.getDesired();
+ String role = roleStatus.getName();
+ String roleGroup = roleStatus.getGroup();
+ int desiredInstanceCount = getDesiredInstanceCount(resources, roleGroup);
+
+ int newDesired = desiredInstanceCount;
+ if (hasUniqueNames(resources, roleGroup)) {
+ Integer groupCount = 0;
+ if (groupCounts.containsKey(roleGroup)) {
+ groupCount = groupCounts.get(roleGroup);
+ }
+
+ newDesired = desiredInstanceCount - groupCount;
+
+ if (newDesired > 0) {
+ newDesired = 1;
+ groupCounts.put(roleGroup, groupCount + newDesired);
+ } else {
+ newDesired = 0;
+ }
+ }
+
+ if (newDesired == 0) {
+ log.info("Role {} has 0 instances specified", role);
+ }
+ if (currentDesired != newDesired) {
+ log.info("Role {} flexed from {} to {}", role, currentDesired,
+ newDesired);
+ roleStatus.setDesired(newDesired);
+ }
+ }
+
+ // now the dynamic ones. Iterate through the the cluster spec and
+ // add any role status entries not in the role status
+ Set<String> roleNames = resources.getComponentNames();
+ for (String name : roleNames) {
+ if (roles.containsKey(name)) {
+ continue;
+ }
+ if (hasUniqueNames(resources, name)) {
+ // THIS NAME IS A GROUP
+ int desiredInstanceCount = getDesiredInstanceCount(resources, name);
+ Integer groupCount = 0;
+ if (groupCounts.containsKey(name)) {
+ groupCount = groupCounts.get(name);
+ }
+ for (int i = groupCount + 1; i <= desiredInstanceCount; i++) {
+ int priority = resources.getComponentOptInt(name, COMPONENT_PRIORITY, i);
+ // this is a new instance of an existing group
+ String newName = String.format("%s%d", name, i);
+ int newPriority = getNewPriority(priority + i - 1);
+ log.info("Adding new role {}", newName);
+ MapOperations component = resources.getComponent(name,
+ Collections.singletonMap(COMPONENT_PRIORITY,
+ Integer.toString(newPriority)));
+ if (component == null) {
+ throw new BadConfigException("Component is null for name = " + name
+ + ", newPriority =" + newPriority);
+ }
+ ProviderRole dynamicRole = createDynamicProviderRole(newName, name, component);
+ RoleStatus roleStatus = buildRole(dynamicRole);
+ roleStatus.setDesired(1);
+ log.info("New role {}", roleStatus);
+ if (roleHistory != null) {
+ roleHistory.addNewRole(roleStatus);
+ }
+ newRoles.add(dynamicRole);
+ }
+ } else {
+ // this is a new value
+ log.info("Adding new role {}", name);
+ MapOperations component = resources.getComponent(name);
+ ProviderRole dynamicRole = createDynamicProviderRole(name, component);
+ RoleStatus roleStatus = buildRole(dynamicRole);
+ roleStatus.setDesired(getDesiredInstanceCount(resources, name));
+ log.info("New role {}", roleStatus);
+ if (roleHistory != null) {
+ roleHistory.addNewRole(roleStatus);
+ }
+ newRoles.add(dynamicRole);
+ }
+ }
+ // and fill in all those roles with their requirements
+ buildRoleResourceRequirements();
+
+ return newRoles;
+ }
+
+ private int getNewPriority(int start) {
+ if (!rolePriorityMap.containsKey(start)) {
+ return start;
+ }
+ return rolePriorityMap.lastKey() + 1;
+ }
+
+ /**
+ * Get the desired instance count of a role, rejecting negative values
+ * @param resources resource map
+ * @param roleGroup role group
+ * @return the instance count
+ * @throws BadConfigException if the count is negative
+ */
+ private int getDesiredInstanceCount(ConfTreeOperations resources,
+ String roleGroup) throws BadConfigException {
+ int desiredInstanceCount =
+ resources.getComponentOptInt(roleGroup, COMPONENT_INSTANCES, 0);
+
+ if (desiredInstanceCount < 0) {
+ log.error("Role {} has negative desired instances : {}", roleGroup,
+ desiredInstanceCount);
+ throw new BadConfigException(
+ "Negative instance count (%) requested for component %s",
+ desiredInstanceCount, roleGroup);
+ }
+ return desiredInstanceCount;
+ }
+
+ private Boolean hasUniqueNames(ConfTreeOperations resources, String group) {
+ MapOperations component = resources.getComponent(group);
+ if (component == null) {
+ log.info("Component was null for {} when checking unique names", group);
+ return Boolean.FALSE;
+ }
+ return component.getOptionBool(UNIQUE_NAMES, Boolean.FALSE);
+ }
+
+ /**
+ * Add knowledge of a role.
+ * This is a build-time operation that is not synchronized, and
+ * should be used while setting up the system state -before servicing
+ * requests.
+ * @param providerRole role to add
+ * @return the role status built up
+ * @throws BadConfigException if a role of that priority already exists
+ */
+ public RoleStatus buildRole(ProviderRole providerRole) throws BadConfigException {
+ // build role status map
+ int priority = providerRole.id;
+ if (roleStatusMap.containsKey(priority)) {
+ throw new BadConfigException("Duplicate Provider Key: %s and %s",
+ providerRole,
+ roleStatusMap.get(priority));
+ }
+ RoleStatus roleStatus = new RoleStatus(providerRole);
+ roleStatusMap.put(priority, roleStatus);
+ String name = providerRole.name;
+ roles.put(name, providerRole);
+ rolePriorityMap.put(priority, providerRole);
+ // register its entries
+ metricsAndMonitoring.addMetricSet(MetricsConstants.PREFIX_SLIDER_ROLES + name, roleStatus);
+ return roleStatus;
+ }
+
+ /**
+ * Build up the requirements of every resource
+ */
+ private void buildRoleResourceRequirements() {
+ for (RoleStatus role : roleStatusMap.values()) {
+ role.setResourceRequirements(
+ buildResourceRequirements(role, recordFactory.newResource()));
+ }
+ }
+
+ /**
+ * build up the special master node, which lives
+ * in the live node set but has a lifecycle bonded to the AM
+ * @param containerId the AM master
+ * @param host hostname
+ * @param amPort port
+ * @param nodeHttpAddress http address: may be null
+ */
+ public void buildAppMasterNode(ContainerId containerId,
+ String host,
+ int amPort,
+ String nodeHttpAddress) {
+ Container container = new ContainerPBImpl();
+ container.setId(containerId);
+ NodeId nodeId = NodeId.newInstance(host, amPort);
+ container.setNodeId(nodeId);
+ container.setNodeHttpAddress(nodeHttpAddress);
+ RoleInstance am = new RoleInstance(container);
+ am.role = SliderKeys.COMPONENT_AM;
+ am.group = SliderKeys.COMPONENT_AM;
+ am.roleId = SliderKeys.ROLE_AM_PRIORITY_INDEX;
+ am.createTime =now();
+ am.startTime = am.createTime;
+ appMasterNode = am;
+ //it is also added to the set of live nodes
+ getLiveContainers().put(containerId, am);
+ putOwnedContainer(containerId, am);
+
+ // patch up the role status
+ RoleStatus roleStatus = roleStatusMap.get(SliderKeys.ROLE_AM_PRIORITY_INDEX);
+ roleStatus.setDesired(1);
+ roleStatus.incActual();
+ roleStatus.incStarted();
+ }
+
+ /**
+ * Note that the master node has been launched,
+ * though it isn't considered live until any forked
+ * processes are running. It is NOT registered with
+ * the role history -the container is incomplete
+ * and it will just cause confusion
+ */
+ public void noteAMLaunched() {
+ getLiveContainers().put(appMasterNode.getContainerId(), appMasterNode);
+ }
+
+ /**
+ * AM declares ourselves live in the cluster description.
+ * This is meant to be triggered from the callback
+ * indicating the spawned process is up and running.
+ */
+ public void noteAMLive() {
+ appMasterNode.state = STATE_LIVE;
+ }
+
+ /**
+ * Look up the status entry of a role or raise an exception
+ * @param key role ID
+ * @return the status entry
+ * @throws RuntimeException if the role cannot be found
+ */
+ public RoleStatus lookupRoleStatus(int key) {
+ RoleStatus rs = getRoleStatusMap().get(key);
+ if (rs == null) {
+ throw new RuntimeException("Cannot find role for role ID " + key);
+ }
+ return rs;
+ }
+
+ /**
+ * Look up the status entry of a container or raise an exception
+ *
+ * @param c container
+ * @return the status entry
+ * @throws RuntimeException if the role cannot be found
+ */
+ public RoleStatus lookupRoleStatus(Container c) {
+ return lookupRoleStatus(ContainerPriority.extractRole(c));
+ }
+
+ /**
+ * Get a deep clone of the role status list. Concurrent events may mean this
+ * list (or indeed, some of the role status entries) may be inconsistent
+ * @return a snapshot of the role status entries
+ */
+ public List<RoleStatus> cloneRoleStatusList() {
+ Collection<RoleStatus> statuses = roleStatusMap.values();
+ List<RoleStatus> statusList = new ArrayList<>(statuses.size());
+ try {
+ for (RoleStatus status : statuses) {
+ statusList.add((RoleStatus)(status.clone()));
+ }
+ } catch (CloneNotSupportedException e) {
+ log.warn("Unexpected cloning failure: {}", e, e);
+ }
+ return statusList;
+ }
+
+
+ /**
+ * Look up a role in the map
+ * @param name role name
+ * @return the instance
+ * @throws YarnRuntimeException if not found
+ */
+ public RoleStatus lookupRoleStatus(String name) throws YarnRuntimeException {
+ ProviderRole providerRole = roles.get(name);
+ if (providerRole == null) {
+ throw new YarnRuntimeException("Unknown role " + name);
+ }
+ return lookupRoleStatus(providerRole.id);
+ }
+
+
+ /**
+ * Clone the list of active (==owned) containers
+ * @return the list of role instances representing all owned containers
+ */
+ public synchronized List<RoleInstance> cloneOwnedContainerList() {
+ Collection<RoleInstance> values = ownedContainers.values();
+ return new ArrayList<>(values);
+ }
+
+ /**
+ * Get the number of active (==owned) containers
+ * @return
+ */
+ public int getNumOwnedContainers() {
+ return ownedContainers.size();
+ }
+
+ /**
+ * Look up an active container: any container that the AM has, even
+ * if it is not currently running/live
+ */
+ public RoleInstance getOwnedContainer(ContainerId id) {
+ return ownedContainers.get(id);
+ }
+
+ /**
+ * Remove an owned container
+ * @param id container ID
+ * @return the instance removed
+ */
+ private RoleInstance removeOwnedContainer(ContainerId id) {
+ return ownedContainers.remove(id);
+ }
+
+ /**
+ * set/update an owned container
+ * @param id container ID
+ * @param instance
+ * @return
+ */
+ private RoleInstance putOwnedContainer(ContainerId id,
+ RoleInstance instance) {
+ return ownedContainers.put(id, instance);
+ }
+
+ /**
+ * Clone the live container list. This is synchronized.
+ * @return a snapshot of the live node list
+ */
+ public synchronized List<RoleInstance> cloneLiveContainerInfoList() {
+ List<RoleInstance> allRoleInstances;
+ Collection<RoleInstance> values = getLiveContainers().values();
+ allRoleInstances = new ArrayList<>(values);
+ return allRoleInstances;
+ }
+
+ /**
+ * Lookup live instance by string value of container ID
+ * @param containerId container ID as a string
+ * @return the role instance for that container
+ * @throws NoSuchNodeException if it does not exist
+ */
+ public synchronized RoleInstance getLiveInstanceByContainerID(String containerId)
+ throws NoSuchNodeException {
+ Collection<RoleInstance> nodes = getLiveContainers().values();
+ return findNodeInCollection(containerId, nodes);
+ }
+
+ /**
+ * Lookup owned instance by string value of container ID
+ * @param containerId container ID as a string
+ * @return the role instance for that container
+ * @throws NoSuchNodeException if it does not exist
+ */
+ public synchronized RoleInstance getOwnedInstanceByContainerID(String containerId)
+ throws NoSuchNodeException {
+ Collection<RoleInstance> nodes = ownedContainers.values();
+ return findNodeInCollection(containerId, nodes);
+ }
+
+ /**
+ * Iterate through a collection of role instances to find one with a
+ * specific (string) container ID
+ * @param containerId container ID as a string
+ * @param nodes collection
+ * @return the found node
+ * @throws NoSuchNodeException if there was no match
+ */
+ private RoleInstance findNodeInCollection(String containerId,
+ Collection<RoleInstance> nodes) throws NoSuchNodeException {
+ RoleInstance found = null;
+ for (RoleInstance node : nodes) {
+ if (containerId.equals(node.id)) {
+ found = node;
+ break;
+ }
+ }
+ if (found != null) {
+ return found;
+ } else {
+ //at this point: no node
+ throw new NoSuchNodeException("Unknown node: " + containerId);
+ }
+ }
+
+ public synchronized List<RoleInstance> getLiveInstancesByContainerIDs(
+ Collection<String> containerIDs) {
+ //first, a hashmap of those containerIDs is built up
+ Set<String> uuidSet = new HashSet<String>(containerIDs);
+ List<RoleInstance> nodes = new ArrayList<RoleInstance>(uuidSet.size());
+ Collection<RoleInstance> clusterNodes = getLiveContainers().values();
+
+ for (RoleInstance node : clusterNodes) {
+ if (uuidSet.contains(node.id)) {
+ nodes.add(node);
+ }
+ }
+ //at this point: a possibly empty list of nodes
+ return nodes;
+ }
+
+ /**
+ * Enum all nodes by role.
+ * @param role role, or "" for all roles
+ * @return a list of nodes, may be empty
+ */
+ public synchronized List<RoleInstance> enumLiveNodesInRole(String role) {
+ List<RoleInstance> nodes = new ArrayList<RoleInstance>();
+ Collection<RoleInstance> allRoleInstances = getLiveContainers().values();
+ for (RoleInstance node : allRoleInstances) {
+ if (role.isEmpty() || role.equals(node.role)) {
+ nodes.add(node);
+ }
+ }
+ return nodes;
+ }
+
+
+ /**
+ * enum nodes by role ID, from either the owned or live node list
+ * @param roleId role the container must be in
+ * @param owned flag to indicate "use owned list" rather than the smaller
+ * "live" list
+ * @return a list of nodes, may be empty
+ */
+ public synchronized List<RoleInstance> enumNodesWithRoleId(int roleId,
+ boolean owned) {
+ List<RoleInstance> nodes = new ArrayList<RoleInstance>();
+ Collection<RoleInstance> allRoleInstances;
+ allRoleInstances = owned ? ownedContainers.values() : liveNodes.values();
+ for (RoleInstance node : allRoleInstances) {
+ if (node.roleId == roleId) {
+ nodes.add(node);
+ }
+ }
+ return nodes;
+ }
+
+ /**
+ * Build an instance map.
+ * @return the map of Role name to list of role instances
+ */
+ private synchronized Map<String, List<String>> createRoleToInstanceMap() {
+ Map<String, List<String>> map = new HashMap<String, List<String>>();
+ for (RoleInstance node : getLiveContainers().values()) {
+ List<String> containers = map.get(node.role);
+ if (containers == null) {
+ containers = new ArrayList<String>();
+ map.put(node.role, containers);
+ }
+ containers.add(node.id);
+ }
+ return map;
+ }
+
+ /**
+ * Build a map of role->nodename->node-info
+ *
+ * @return the map of Role name to list of Cluster Nodes
+ */
+ public synchronized Map<String, Map<String, ClusterNode>> createRoleToClusterNodeMap() {
+ Map<String, Map<String, ClusterNode>> map = new HashMap<>();
+ for (RoleInstance node : getLiveContainers().values()) {
+
+ Map<String, ClusterNode> containers = map.get(node.role);
+ if (containers == null) {
+ containers = new HashMap<String, ClusterNode>();
+ map.put(node.role, containers);
+ }
+ ClusterNode clusterNode = node.toClusterNode();
+ containers.put(clusterNode.name, clusterNode);
+ }
+ return map;
+ }
+
+ /**
+ * Notification called just before the NM is asked to
+ * start a container
+ * @param container container to start
+ * @param instance clusterNode structure
+ */
+ public void containerStartSubmitted(Container container,
+ RoleInstance instance) {
+ instance.state = STATE_SUBMITTED;
+ instance.container = container;
+ instance.createTime = now();
+ getStartingContainers().put(container.getId(), instance);
+ putOwnedContainer(container.getId(), instance);
+ roleHistory.onContainerStartSubmitted(container, instance);
+ }
+
+ /**
+ * Note that a container has been submitted for release; update internal state
+ * and mark the associated ContainerInfo released field to indicate that
+ * while it is still in the active list, it has been queued for release.
+ *
+ * @param container container
+ * @throws SliderInternalStateException if there is no container of that ID
+ * on the active list
+ */
+ public synchronized void containerReleaseSubmitted(Container container)
+ throws SliderInternalStateException {
+ ContainerId id = container.getId();
+ //look up the container
+ RoleInstance instance = getOwnedContainer(id);
+ if (instance == null) {
+ throw new SliderInternalStateException(
+ "No active container with ID " + id);
+ }
+ //verify that it isn't already released
+ if (containersBeingReleased.containsKey(id)) {
+ throw new SliderInternalStateException(
+ "Container %s already queued for release", id);
+ }
+ instance.released = true;
+ containersBeingReleased.put(id, instance.container);
+ RoleStatus role = lookupRoleStatus(instance.roleId);
+ role.incReleasing();
+ roleHistory.onContainerReleaseSubmitted(container);
+ }
+
+ /**
+ * Create a container request.
+ * Update internal state, such as the role request count.
+ * Anti-Affine: the {@link RoleStatus#outstandingAArequest} is set here.
+ * This is where role history information will be used for placement decisions.
+ * @param role role
+ * @return the container request to submit or null if there is none
+ */
+ private AMRMClient.ContainerRequest createContainerRequest(RoleStatus role) {
+ if (role.isAntiAffinePlacement()) {
+ return createAAContainerRequest(role);
+ } else {
+ incrementRequestCount(role);
+ OutstandingRequest request = roleHistory.requestContainerForRole(role);
+ if (request != null) {
+ return request.getIssuedRequest();
+ } else {
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Create a container request.
+ * Update internal state, such as the role request count.
+ * Anti-Affine: the {@link RoleStatus#outstandingAArequest} is set here.
+ * This is where role history information will be used for placement decisions.
+ * @param role role
+ * @return the container request to submit or null if there is none
+ */
+ private AMRMClient.ContainerRequest createAAContainerRequest(RoleStatus role) {
+ OutstandingRequest request = roleHistory.requestContainerForAARole(role);
+ if (request == null) {
+ return null;
+ }
+ incrementRequestCount(role);
+ role.setOutstandingAArequest(request);
+ return request.getIssuedRequest();
+ }
+
+ /**
+ * Increment the request count of a role.
+ * <p>
+ * Also updates application state counters
+ * @param role role being requested.
+ */
+ protected void incrementRequestCount(RoleStatus role) {
+ role.incRequested();
+ incOutstandingContainerRequests();
+ }
+
+ /**
+ * Inc #of outstanding requests.
+ */
+ private void incOutstandingContainerRequests() {
+ outstandingContainerRequests.inc();
+ }
+
+ /**
+ * Decrement the number of outstanding requests. This never goes below zero.
+ */
+ private void decOutstandingContainerRequests() {
+ synchronized (outstandingContainerRequests) {
+ if (outstandingContainerRequests.getCount() > 0) {
+ // decrement but never go below zero
+ outstandingContainerRequests.dec();
+ }
+ }
+ }
+
+
+ /**
+ * Get the value of a YARN requirement (cores, RAM, etc).
+ * These are returned as integers, but there is special handling of the
+ * string {@link ResourceKeys#YARN_RESOURCE_MAX}, which triggers
+ * the return of the maximum value.
+ * @param group component to get from
+ * @param option option name
+ * @param defVal default value
+ * @param maxVal value to return if the max val is requested
+ * @return parsed value
+ * @throws NumberFormatException if the role could not be parsed.
+ */
+ private int getResourceRequirement(ConfTreeOperations resources,
+ String group,
+ String option,
+ int defVal,
+ int maxVal) {
+
+ String val = resources.getComponentOpt(group, option,
+ Integer.toString(defVal));
+ Integer intVal;
+ if (YARN_RESOURCE_MAX.equals(val)) {
+ intVal = maxVal;
+ } else {
+ intVal = Integer.decode(val);
+ }
+ return intVal;
+ }
+
+ /**
+ * Build up the resource requirements for this role from the
+ * cluster specification, including substituing max allowed values
+ * if the specification asked for it.
+ * @param role role
+ * @param capability capability to set up. A new one may be created
+ * during normalization
+ */
+ public Resource buildResourceRequirements(RoleStatus role, Resource capability) {
+ // Set up resource requirements from role values
+ String name = role.getName();
+ String group = role.getGroup();
+ ConfTreeOperations resources = getResourcesSnapshot();
+ int cores = getResourceRequirement(resources,
+ group,
+ YARN_CORES,
+ DEF_YARN_CORES,
+ containerMaxCores);
+ capability.setVirtualCores(cores);
+ int ram = getResourceRequirement(resources, group,
+ YARN_MEMORY,
+ DEF_YARN_MEMORY,
+ containerMaxMemory);
+ capability.setMemory(ram);
+ log.debug("Component {} has RAM={}, vCores ={}", name, ram, cores);
+ Resource normalized = recordFactory.normalize(capability, minResource,
+ maxResource);
+ if (!Resources.equals(normalized, capability)) {
+ // resource requirements normalized to something other than asked for.
+ // LOG @ WARN so users can see why this is happening.
+ log.warn("Resource requirements of {} normalized" +
+ " from {} to {}", name, capability, normalized);
+ }
+ return normalized;
+ }
+
+ /**
+ * add a launched container to the node map for status responses
+ * @param container id
+ * @param node node details
+ */
+ private void addLaunchedContainer(Container container, RoleInstance node) {
+ node.container = container;
+ if (node.role == null) {
+ throw new RuntimeException(
+ "Unknown role for node " + node);
+ }
+ getLiveContainers().put(node.getContainerId(), node);
+ //tell role history
+ roleHistory.onContainerStarted(container);
+ }
+
+ /**
+ * container start event
+ * @param containerId container that is to be started
+ * @return the role instance, or null if there was a problem
+ */
+ public synchronized RoleInstance onNodeManagerContainerStarted(ContainerId containerId) {
+ try {
+ return innerOnNodeManagerContainerStarted(containerId);
+ } catch (YarnRuntimeException e) {
+ log.error("NodeManager callback on started container {} failed",
+ containerId,
+ e);
+ return null;
+ }
+ }
+
+ /**
+ * container start event handler -throwing an exception on problems
+ * @param containerId container that is to be started
+ * @return the role instance
+ * @throws RuntimeException on problems
+ */
+ @VisibleForTesting
+ public RoleInstance innerOnNodeManagerContainerStarted(ContainerId containerId) {
+ incStartedCountainerCount();
+ RoleInstance instance = getOwnedContainer(containerId);
+ if (instance == null) {
+ //serious problem
+ throw new YarnRuntimeException("Container not in active containers start "+
+ containerId);
+ }
+ if (instance.role == null) {
+ throw new YarnRuntimeException("Component instance has no instance name " +
+ instance);
+ }
+ instance.startTime = now();
+ RoleInstance starting = getStartingContainers().remove(containerId);
+ if (null == starting) {
+ throw new YarnRuntimeException(
+ "Container "+ containerId +" is already started");
+ }
+ instance.state = STATE_LIVE;
+ RoleStatus roleStatus = lookupRoleStatus(instance.roleId);
+ roleStatus.incStarted();
+ Container container = instance.container;
+ addLaunchedContainer(container, instance);
+ return instance;
+ }
+
+ /**
+ * update the application state after a failure to start a container.
+ * This is perhaps where blacklisting could be most useful: failure
+ * to start a container is a sign of a more serious problem
+ * than a later exit.
+ *
+ * -relayed from NMClientAsync.CallbackHandler
+ * @param containerId failing container
+ * @param thrown what was thrown
+ */
+ public synchronized void onNodeManagerContainerStartFailed(ContainerId containerId,
+ Throwable thrown) {
+ removeOwnedContainer(containerId);
+ incFailedCountainerCount();
+ incStartFailedCountainerCount();
+ RoleInstance instance = getStartingContainers().remove(containerId);
+ if (null != instance) {
+ RoleStatus roleStatus = lookupRoleStatus(instance.roleId);
+ String text;
+ if (null != thrown) {
+ text = SliderUtils.stringify(thrown);
+ } else {
+ text = "container start failure";
+ }
+ instance.diagnostics = text;
+ roleStatus.noteFailed(true, text, ContainerOutcome.Failed);
+ getFailedContainers().put(containerId, instance);
+ roleHistory.onNodeManagerContainerStartFailed(instance.container);
+ }
+ }
+
+ /**
+ * Handle node update from the RM. This syncs up the node map with the RM's view
+ * @param updatedNodes updated nodes
+ */
+ public synchronized NodeUpdatedOutcome onNodesUpdated(List<NodeReport> updatedNodes) {
+ boolean changed = roleHistory.onNodesUpdated(updatedNodes);
+ if (changed) {
+ log.info("YARN cluster changed \u2014cancelling current AA requests");
+ List<AbstractRMOperation> operations = cancelOutstandingAARequests();
+ log.debug("Created {} cancel requests", operations.size());
+ return new NodeUpdatedOutcome(true, operations);
+ }
+ return new NodeUpdatedOutcome(false, new ArrayList<AbstractRMOperation>(0));
+ }
+
+ /**
+ * Return value of the {@link #onNodesUpdated(List)} call.
+ */
+ public static class NodeUpdatedOutcome {
+ public final boolean clusterChanged;
+ public final List<AbstractRMOperation> operations;
+
+ public NodeUpdatedOutcome(boolean clusterChanged,
+ List<AbstractRMOperation> operations) {
+ this.clusterChanged = clusterChanged;
+ this.operations = operations;
+ }
+ }
+ /**
+ * Is a role short lived by the threshold set for this application
+ * @param instance instance
+ * @return true if the instance is considered short lived
+ */
+ @VisibleForTesting
+ public boolean isShortLived(RoleInstance instance) {
+ long time = now();
+ long started = instance.startTime;
+ boolean shortlived;
+ if (started > 0) {
+ long duration = time - started;
+ shortlived = duration < (startTimeThreshold * 1000);
+ log.info("Duration {} and startTimeThreshold {}", duration, startTimeThreshold);
+ } else {
+ // never even saw a start event
+ shortlived = true;
+ }
+ return shortlived;
+ }
+
+ /**
+ * Current time in milliseconds. Made protected for
+ * the option to override it in tests.
+ * @return the current time.
+ */
+ protected long now() {
+ return System.currentTimeMillis();
+ }
+
+ /**
+ * This is a very small class to send a multiple result back from
+ * the completion operation
+ */
+ public static class NodeCompletionResult {
+ public boolean surplusNode = false;
+ public RoleInstance roleInstance;
+ // did the container fail for *any* reason?
+ public boolean containerFailed = false;
+ // detailed outcome on the container failure
+ public ContainerOutcome outcome = ContainerOutcome.Completed;
+ public int exitStatus = 0;
+ public boolean unknownNode = false;
+
+ public String toString() {
+ final StringBuilder sb =
+ new StringBuilder("NodeCompletionResult{");
+ sb.append("surplusNode=").append(surplusNode);
+ sb.append(", roleInstance=").append(roleInstance);
+ sb.append(", exitStatus=").append(exitStatus);
+ sb.append(", containerFailed=").append(containerFailed);
+ sb.append(", outcome=").append(outcome);
+ sb.append(", unknownNode=").append(unknownNode);
+ sb.append('}');
+ return sb.toString();
+ }
+ }
+
+ /**
+ * handle completed node in the CD -move something from the live
+ * server list to the completed server list.
+ * @param status the node that has just completed
+ * @return NodeCompletionResult
+ */
+ public synchronized NodeCompletionResult onCompletedNode(ContainerStatus status) {
+ ContainerId containerId = status.getContainerId();
+ NodeCompletionResult result = new NodeCompletionResult();
+ RoleInstance roleInstance;
+
+ int exitStatus = status.getExitStatus();
+ result.exitStatus = exitStatus;
+ if (containersBeingReleased.containsKey(containerId)) {
+ log.info("Container was queued for release : {}", containerId);
+ Container container = containersBeingReleased.remove(containerId);
+ RoleStatus roleStatus = lookupRoleStatus(container);
+ long releasing = roleStatus.decReleasing();
+ long actual = roleStatus.decActual();
+ long completedCount = roleStatus.incCompleted();
+ log.info("decrementing role count for role {} to {}; releasing={}, completed={}",
+ roleStatus.getName(),
+ actual,
+ releasing,
+ completedCount);
+ result.outcome = ContainerOutcome.Completed;
+ roleHistory.onReleaseCompleted(container);
+
+ } else if (surplusNodes.remove(containerId)) {
+ //its a surplus one being purged
+ result.surplusNode = true;
+ } else {
+ // a container has failed or been killed
+ // use the exit code to determine the outcome
+ result.containerFailed = true;
+ result.outcome = ContainerOutcome.fromExitStatus(exitStatus);
+
+ roleInstance = removeOwnedContainer(containerId);
+ if (roleInstance != null) {
+ //it was active, move it to failed
+ incFailedCountainerCount();
+ failedContainers.put(containerId, roleInstance);
+ } else {
+ // the container may have been noted as failed already, so look
+ // it up
+ roleInstance = failedContainers.get(containerId);
+ }
+ if (roleInstance != null) {
+ int roleId = roleInstance.roleId;
+ String rolename = roleInstance.role;
+ log.info("Failed container in role[{}] : {}", roleId, rolename);
+ try {
+ RoleStatus roleStatus = lookupRoleStatus(roleId);
+ roleStatus.decActual();
+ boolean shortLived = isShortLived(roleInstance);
+ String message;
+ Container failedContainer = roleInstance.container;
+
+ //build the failure message
+ if (failedContainer != null) {
+ String completedLogsUrl = getLogsURLForContainer(failedContainer);
+ message = String.format("Failure %s on host %s (%d): %s",
+ roleInstance.getContainerId(),
+ failedContainer.getNodeId().getHost(),
+ exitStatus,
+ completedLogsUrl);
+ } else {
+ message = String.format("Failure %s (%d)", containerId, exitStatus);
+ }
+ roleStatus.noteFailed(shortLived, message, result.outcome);
+ long failed = roleStatus.getFailed();
+ log.info("Current count of failed role[{}] {} = {}",
+ roleId, rolename, failed);
+ if (failedContainer != null) {
+ roleHistory.onFailedContainer(failedContainer, shortLived, result.outcome);
+ }
+
+ } catch (YarnRuntimeException e1) {
+ log.error("Failed container of unknown role {}", roleId);
+ }
+ } else {
+ //this isn't a known container.
+
+ log.error("Notified of completed container {} that is not in the list" +
+ " of active or failed containers", containerId);
+ completionOfUnknownContainerEvent.incrementAndGet();
+ result.unknownNode = true;
+ }
+ }
+
+ if (result.surplusNode) {
+ //a surplus node
+ return result;
+ }
+
+ //record the complete node's details; this pulls it from the livenode set
+ //remove the node
+ ContainerId id = status.getContainerId();
+ log.info("Removing node ID {}", id);
+ RoleInstance node = getLiveContainers().remove(id);
+ if (node != null) {
+ node.state = STATE_DESTROYED;
+ node.exitCode = exitStatus;
+ node.diagnostics = status.getDiagnostics();
+ getCompletedContainers().put(id, node);
+ result.roleInstance = node;
+ } else {
+ // not in the list
+ log.warn("Received notification of completion of unknown node {}", id);
+ completionOfNodeNotInLiveListEvent.incrementAndGet();
+ }
+
+ // and the active node list if present
+ removeOwnedContainer(containerId);
+
+ // finally, verify the node doesn't exist any more
+ assert !containersBeingReleased.containsKey(
+ containerId) : "container still in release queue";
+ assert !getLiveContainers().containsKey(
+ containerId) : " container still in live nodes";
+ assert getOwnedContainer(containerId) ==
+ null : "Container still in active container list";
+
+ return result;
+ }
+
+ /**
+ * Get the URL log for a container
+ * @param c container
+ * @return the URL or "" if it cannot be determined
+ */
+ protected String getLogsURLForContainer(Container c) {
+ if (c==null) {
+ return null;
+ }
+ String user = null;
+ try {
+ user = SliderUtils.getCurrentUser().getShortUserName();
+ } catch (IOException ignored) {
+ }
+ String completedLogsUrl = "";
+ String url = logServerURL;
+ if (user != null && SliderUtils.isSet(url)) {
+ completedLogsUrl = url
+ + "/" + c.getNodeId() + "/" + c.getId() + "/ctx/" + user;
+ }
+ return completedLogsUrl;
+ }
+
+ /**
+ * Return the percentage done that Slider is to have YARN display in its
+ * Web UI
+ * @return an number from 0 to 100
+ */
+ public synchronized float getApplicationProgressPercentage() {
+ float percentage;
+ long desired = 0;
+ float actual = 0;
+ for (RoleStatus role : getRoleStatusMap().values()) {
+ desired += role.getDesired();
+ actual += role.getActual();
+ }
+ if (desired == 0) {
+ percentage = 100;
+ } else {
+ percentage = actual / desired;
+ }
+ return percentage;
+ }
+
+ /**
+ * Update the cluster description with the current application state
+ */
+
+ public ClusterDescription refreshClusterStatus() {
+ return refreshClusterStatus(null);
+ }
+
+ /**
+ * Update the cluster description with the current application state
+ * @param providerStatus status from the provider for the cluster info section
+ */
+ public synchronized ClusterDescription refreshClusterStatus(Map<String, String> providerStatus) {
+ ClusterDescription cd = getClusterStatus();
+ long now = now();
+ cd.setInfoTime(StatusKeys.INFO_STATUS_TIME_HUMAN,
+ StatusKeys.INFO_STATUS_TIME_MILLIS,
+ now);
+ if (providerStatus != null) {
+ for (Map.Entry<String, String> entry : providerStatus.entrySet()) {
+ cd.setInfo(entry.getKey(), entry.getValue());
+ }
+ }
+ MapOperations infoOps = new MapOperations("info", cd.info);
+ infoOps.mergeWithoutOverwrite(applicationInfo);
+ SliderUtils.addBuildInfo(infoOps, "status");
+ cd.statistics = new HashMap<>();
+
+ // build the map of node -> container IDs
+ Map<String, List<String>> instanceMap = createRoleToInstanceMap();
+ cd.instances = instanceMap;
+
+ //build the map of node -> containers
+ Map<String, Map<String, ClusterNode>> clusterNodes =
+ createRoleToClusterNodeMap();
+ log.info("app state clusterNodes {} ", clusterNodes.toString());
+ cd.status = new HashMap<>();
+ cd.status.put(ClusterDescriptionKeys.KEY_CLUSTER_LIVE, clusterNodes);
+
+
+ for (RoleStatus role : getRoleStatusMap().values()) {
+ String rolename = role.getName();
+ if (hasUniqueNames(instanceDefinition.getResourceOperations(),
+ role.getGroup())) {
+ cd.setRoleOpt(rolename, COMPONENT_PRIORITY, role.getPriority());
+ cd.setRoleOpt(rolename, ROLE_GROUP, role.getGroup());
+ MapOperations groupOptions = instanceDefinition.getResourceOperations()
+ .getComponent(role.getGroup());
+ SliderUtils.mergeMapsIgnoreDuplicateKeys(cd.getRole(rolename),
+ groupOptions.options);
+ }
+ List<String> instances = instanceMap.get(rolename);
+ int nodeCount = instances != null ? instances.size(): 0;
+ cd.setRoleOpt(rolename, COMPONENT_INSTANCES,
+ role.getDesired());
+ cd.setRoleOpt(rolename, ROLE_ACTUAL_INSTANCES, nodeCount);
+ cd.setRoleOpt(rolename, ROLE_REQUESTED_INSTANCES, role.getRequested());
+ cd.setRoleOpt(rolename, ROLE_RELEASING_INSTANCES, role.getReleasing());
+ cd.setRoleOpt(rolename, ROLE_FAILED_INSTANCES, role.getFailed());
+ cd.setRoleOpt(rolename, ROLE_FAILED_STARTING_INSTANCES, role.getStartFailed());
+ cd.setRoleOpt(rolename, ROLE_FAILED_RECENTLY_INSTANCES, role.getFailedRecently());
+ cd.setRoleOpt(rolename, ROLE_NODE_FAILED_INSTANCES, role.getNodeFailed());
+ cd.setRoleOpt(rolename, ROLE_PREEMPTED_INSTANCES, role.getPreempted());
+ if (role.isAntiAffinePlacement()) {
+ cd.setRoleOpt(rolename, ROLE_PENDING_AA_INSTANCES, role.getPendingAntiAffineRequests());
+ }
+ Map<String, Integer> stats = role.buildStatistics();
+ cd.statistics.put(rolename, stats);
+ }
+
+ Map<String, Integer> sliderstats = getLiveStatistics();
+ cd.statistics.put(SliderKeys.COMPONENT_AM, sliderstats);
+
+ // liveness
+ cd.liveness = getApplicationLivenessInformation();
+
+ return cd;
+ }
+
+ /**
+ * get application liveness information
+ * @return a snapshot of the current liveness information
+ */
+ public ApplicationLivenessInformation getApplicationLivenessInformation() {
+ ApplicationLivenessInformation li = new ApplicationLivenessInformation();
+ RoleStatistics stats = getRoleStatistics();
+ int outstanding = (int)(stats.desired - stats.actual);
+ li.requestsOutstanding = outstanding;
+ li.allRequestsSatisfied = outstanding <= 0;
+ li.activeRequests = (int)stats.requested;
+ return li;
+ }
+
+ /**
+ * Get the live statistics map
+ * @return a map of statistics values, defined in the {@link StatusKeys}
+ * keylist.
+ */
+ protected Map<String, Integer> getLiveStatistics() {
+ Map<String, Integer> sliderstats = new HashMap<>();
+ sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_LIVE,
+ liveNodes.size());
+ sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_COMPLETED,
+ completedContainerCount.intValue());
+ sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_FAILED,
+ failedContainerCount.intValue());
+ sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_STARTED,
+ startedContainers.intValue());
+ sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_START_FAILED,
+ startFailedContainerCount.intValue());
+ sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_SURPLUS,
+ surplusContainers.intValue());
+ sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_UNKNOWN_COMPLETED,
+ completionOfUnknownContainerEvent.get());
+ return sliderstats;
+ }
+
+ /**
+ * Get the aggregate statistics across all roles
+ * @return role statistics
+ */
+ public RoleStatistics getRoleStatistics() {
+ RoleStatistics stats = new RoleStatistics();
+ for (RoleStatus role : getRoleStatusMap().values()) {
+ stats.add(role.getStatistics());
+ }
+ return stats;
+ }
+
+ /**
+ * Get a snapshot of component information.
+ * <p>
+ * This does <i>not</i> include any container list, which
+ * is more expensive to create.
+ * @return a map of current role status values.
+ */
+ public Map<String, ComponentInformation> getComponentInfoSnapshot() {
+
+ Map<Integer, RoleStatus> statusMap = getRoleStatusMap();
+ Map<String, ComponentInformation> results = new HashMap<>(
+ statusMap.size());
+
+ for (RoleStatus status : statusMap.values()) {
+ String name = status.getName();
+ ComponentInformation info = status.serialize();
+ results.put(name, info);
+ }
+ return results;
+ }
+
+ /**
+ * Look at where the current node state is -and whether it should be changed
+ */
+ public synchronized List<AbstractRMOperation> reviewRequestAndReleaseNodes()
+ throws SliderInternalStateException, TriggerClusterTeardownException {
+ log.debug("in reviewRequestAndReleaseNodes()");
+ List<AbstractRMOperation> allOperations = new ArrayList<>();
+ for (RoleStatus roleStatus : getRoleStatusMap().values()) {
+ if (!roleStatus.isExcludeFromFlexing()) {
+ List<AbstractRMOperation> operations = reviewOneRole(roleStatus);
+ allOperations.addAll(operations);
+ }
+ }
+ return allOperations;
+ }
+
+ /**
+ * Check the "recent" failure threshold for a role
+ * @param role role to examine
+ * @throws TriggerClusterTeardownException if the role
+ * has failed too many times
+ */
+ private void checkFailureThreshold(RoleStatus role)
+ throws TriggerClusterTeardownException {
+ long failures = role.getFailedRecently();
+ int threshold = getFailureThresholdForRole(role);
+ if (log.isDebugEnabled() && failures > 0) {
+ log.debug("Failure count of component: {}: {}, threshold={}",
+ role.getName(), failures, threshold);
+ }
+
+ if (failures > threshold) {
+ throw new TriggerClusterTeardownException(
+ SliderExitCodes.EXIT_DEPLOYMENT_FAILED,
+ FinalApplicationStatus.FAILED, ErrorStrings.E_UNSTABLE_CLUSTER +
+ " - failed with component %s failed 'recently' %d times (%d in startup);" +
+ " threshold is %d - last failure: %s",
+ role.getName(),
+ role.getFailed(),
+ role.getStartFailed(),
+ threshold,
+ role.getFailureMessage());
+ }
+ }
+
+ /**
+ * Get the failure threshold for a specific role, falling back to
+ * the global one if not
+ * @param roleStatus role
+ * @return the threshold for failures
+ */
+ private int getFailureThresholdForRole(RoleStatus roleStatus) {
+ ConfTreeOperations resources =
+ instanceDefinition.getResourceOperations();
+ return resources.getComponentOptInt(roleStatus.getGroup(),
+ CONTAINER_FAILURE_THRESHOLD,
+ failureThreshold);
+ }
+
+ /**
+ * Get the node failure threshold for a specific role, falling back to
+ * the global one if not
+ * @param roleGroup role group
+ * @return the threshold for failures
+ */
+ private int getNodeFailureThresholdForRole(String roleGroup) {
+ ConfTreeOperations resources =
+ instanceDefinition.getResourceOperations();
+ return resources.getComponentOptInt(roleGroup,
+ NODE_FAILURE_THRESHOLD,
+ nodeFailureThreshold);
+ }
+
+ /**
+ * Reset the "recent" failure counts of all roles
+ */
+ public void resetFailureCounts() {
+ for (RoleStatus roleStatus : getRoleStatusMap().values()) {
+ long failed = roleStatus.resetFailedRecently();
+ log.info("Resetting failure count of {}; was {}",
+ roleStatus.getName(),
+ failed);
+ }
+ roleHistory.resetFailedRecently();
+ }
+
+ /**
+ * Escalate operation as triggered by external timer.
+ * @return a (usually empty) list of cancel/request operations.
+ */
+ public List<AbstractRMOperation> escalateOutstandingRequests() {
+ return roleHistory.escalateOutstandingRequests();
+ }
+
+ /**
+ * Cancel any outstanding AA Requests, building up the list of ops to
+ * cancel, removing them from RoleHistory structures and the RoleStatus
+ * entries.
+ * @return a (usually empty) list of cancel/request operations.
+ */
+ public synchronized List<AbstractRMOperation> cancelOutstandingAARequests() {
+ // get the list of cancel operations
+ List<AbstractRMOperation> operations = roleHistory.cancelOutstandingAARequests();
+ for (RoleStatus roleStatus : roleStatusMap.values()) {
+ if (roleStatus.isAARequestOutstanding()) {
+ log.info("Cancelling outstanding AA request for {}", roleStatus);
+ roleStatus.cancelOutstandingAARequest();
+ }
+ }
+ return operations;
+ }
+
+ /**
+ * Look at the allocation status of one role, and trigger add/release
+ * actions if the number of desired role instances doesn't equal
+ * (actual + pending).
+ * <p>
+ * MUST be executed from within a synchronized method
+ * <p>
+ * @param role role
+ * @return a list of operations
+ * @throws SliderInternalStateException if the operation reveals that
+ * the internal state of the application is inconsistent.
+ */
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ private List<AbstractRMOperation> reviewOneRole(RoleStatus role)
+ throws SliderInternalStateException, TriggerClusterTeardownException {
+ List<AbstractRMOperation> operations = new ArrayList<>();
+ long delta;
+ long expected;
+ String name = role.getName();
+ synchronized (role) {
+ delta = role.getDelta();
+ expected = role.getDesired();
+ }
+
+ log.info("Reviewing {} : ", role);
+ log.debug("Expected {}, Delta: {}", expected, delta);
+ checkFailureThreshold(role);
+
+ if (expected < 0 ) {
+ // negative value: fail
+ throw new TriggerClusterTeardownException(
+ SliderExitCodes.EXIT_DEPLOYMENT_FAILED,
+ FinalApplicationStatus.FAILED,
+ "Negative component count of %d desired for component %s",
+ expected, role);
+ }
+
+ if (delta > 0) {
+ // more workers needed than we have -ask for more
+ log.info("{}: Asking for {} more nodes(s) for a total of {} ", name, delta, expected);
+
+ if (role.isAntiAffinePlacement()) {
+ long pending = delta;
+ if (roleHistory.canPlaceAANodes()) {
+ // build one only if there is none outstanding, the role history knows
+ // enough about the cluster to ask, and there is somewhere to place
+ // the node
+ if (!role.isAARequestOutstanding()) {
+ // no outstanding AA; try to place things
+ AMRMClient.ContainerRequest request = createAAContainerRequest(role);
+ if (request != null) {
+ pending--;
+ log.info("Starting an anti-affine request sequence for {} nodes; pending={}",
+ delta, pending);
+ addContainerRequest(operations, request);
+ } else {
+ log.info("No location for anti-affine request");
+ }
+ }
+ } else {
+ log.warn("Awaiting node map before generating anti-affinity requests");
+ }
+ log.info("Setting pending to {}", pending);
+ role.setPendingAntiAffineRequests(pending);
+ } else {
+
+ for (int i = 0; i < delta; i++) {
+ //get the role history to select a suitable node, if available
+ addContainerRequest(operations, createContainerRequest(role));
+ }
+ }
+ } else if (delta < 0) {
+ log.info("{}: Asking for {} fewer node(s) for a total of {}", name,
+ -delta,
+ expected);
+ // reduce the number expected (i.e. subtract the delta)
+ long excess = -delta;
+
+ // how many requests are outstanding? for AA roles, this includes pending
+ long outstandingRequests = role.getRequested() + role.getPendingAntiAffineRequests();
+ if (outstandingRequests > 0) {
+ // outstanding requests.
+ int toCancel = (int)Math.min(outstandingRequests, excess);
+
+ // Delegate to Role History
+ List<AbstractRMOperation> cancellations = roleHistory.cancelRequestsForRole(role, toCancel);
+ log.info("Found {} outstanding requests to cancel", cancellations.size());
+ operations.addAll(cancellations);
+ if (toCancel != cancellations.size()) {
+ log.error("Tracking of outstanding requests is not in sync with the summary statistics:" +
+ " expected to be able to cancel {} requests, but got {}",
+ toCancel, cancellations.size());
+ }
+
+ role.cancel(toCancel);
+ excess -= toCancel;
+ assert excess >= 0 : "Attempted to cancel too many requests";
+ log.info("Submitted {} cancellations, leaving {} to release",
+ toCancel, excess);
+ if (excess == 0) {
+ log.info("After cancelling requests, application is now at desired size");
+ }
+ }
+
+ // after the cancellation there may be no excess
+ if (excess > 0) {
+
+ // there's an excess, so more to cancel
+ // get the nodes to release
+ int roleId = role.getKey();
+
+ // enum all active nodes that aren't being released
+ List<RoleInstance> containersToRelease = enumNodesWithRoleId(roleId, true);
+ if (containersToRelease.isEmpty()) {
+ log.info("No containers for component {}", roleId);
+ }
+
+ // filter out all release-in-progress nodes
+ ListIterator<RoleInstance> li = containersToRelease.listIterator();
+ while (li.hasNext()) {
+ RoleInstance next = li.next();
+ if (next.released) {
+ li.remove();
+ }
+ }
+
+ // warn if the desired state can't be reached
+ int numberAvailableForRelease = containersToRelease.size();
+ if (numberAvailableForRelease < excess) {
+ log.warn("Not enough containers to release, have {} and need {} more",
+ numberAvailableForRelease,
+ excess - numberAvailableForRelease);
+ }
+
+ // ask the release selector to sort the targets
+ containersToRelease = containerReleaseSelector.sortCandidates(
+ roleId,
+ containersToRelease);
+
+ // crop to the excess
+ List<RoleInstance> finalCandidates = (excess < numberAvailableForRelease)
+ ? containersToRelease.subList(0, (int)excess)
+ : containersToRelease;
+
+ // then build up a release operation, logging each container as released
+ for (RoleInstance possible : finalCandidates) {
+ log.info("Targeting for release: {}", possible);
+ containerReleaseSubmitted(possible.container);
+ operations.add(new ContainerReleaseOperation(possible.getId()));
+ }
+ }
+
+ } else {
+ // actual + requested == desired
+ // there's a special case here: clear all pending AA requests
+ if (role.getPendingAntiAffineRequests() > 0) {
+ log.debug("Clearing outstanding pending AA requests");
+ role.setPendingAntiAffineRequests(0);
+ }
+ }
+
+ // there's now a list of operations to execute
+ log.debug("operations scheduled: {}; updated role: {}", operations.size(), role);
+ return operations;
+ }
+
+ /**
+ * Add a container request if the request is non-null
+ * @param operations operations to add the entry to
+ * @param containerAsk what to ask for
+ * @return true if a request was added
+ */
+ private boolean addContainerRequest(List<AbstractRMOperation> operations,
+ AMRMClient.ContainerRequest containerAsk) {
+ if (containerAsk != null) {
+ log.info("Container ask is {} and label = {}", containerAsk,
+ containerAsk.getNodeLabelExpression());
+ int askMemory = containerAsk.getCapability().getMemory();
+ if (askMemory > this.containerMaxMemory) {
+ log.warn("Memory requested: {} > max of {}", askMemory, containerMaxMemory);
+ }
+ operations.add(new ContainerRequestOperation(containerAsk));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Releases a container based on container id
+ * @param containerId
+ * @return
+ * @throws SliderInternalStateException
+ */
+ public List<AbstractRMOperation> releaseContainer(ContainerId containerId)
+ throws SliderInternalStateException {
+ List<AbstractRMOperation> operations = new ArrayList<AbstractRMOperation>();
+ List<RoleInstance> activeRoleInstances = cloneOwnedContainerList();
+ for (RoleInstance role : activeRoleInstances) {
+ if (role.container.getId().equals(containerId)) {
+ containerReleaseSubmitted(role.container);
+ operations.add(new ContainerReleaseOperation(role.getId()));
+ }
+ }
+
+ return operations;
+ }
+
+ /**
+ * Find a container running on a specific host -looking
+ * into the node ID to determine this.
+ *
+ * @param node node
+ * @param roleId role the container must be in
+ * @return a container or null if there are no containers on this host
+ * that can be released.
+ */
+ private RoleInstance findRoleInstanceOnHost(NodeInstance node, int roleId) {
+ Collection<RoleInstance> targets = cloneOwnedContainerList();
+ String hostname = node.hostname;
+ for (RoleInstance ri : targets) {
+ if (hostname.equals(RoleHistoryUtils.hostnameOf(ri.container))
+ && ri.roleId == roleId
+ && containersBeingReleased.get(ri.getContainerId()) == null) {
+ return ri;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Release all containers.
+ * @return a list of operations to execute
+ */
+ public synchronized List<AbstractRMOperation> releaseAllContainers() {
+
+ Collection<RoleInstance> targets = cloneOwnedContainerList();
+ log.info("Releasing {} containers", targets.size());
+ List<AbstractRMOperation> operations =
+ new ArrayList<>(targets.size());
+ for (RoleInstance instance : targets) {
+ if (instance.roleId == SliderKeys.ROLE_AM_PRIORITY_INDEX) {
+ // don't worry about the AM
+ continue;
+ }
+ Container possible = instance.container;
+ ContainerId id = possible.getId();
+ if (!instance.released) {
+ String url = getLogsURLForContainer(possible);
+ log.info("Releasing container. Log: " + url);
+ try {
+ containerReleaseSubmitted(possible);
+ } catch (SliderInternalStateException e) {
+ log.warn("when releasing container {} :", possible, e);
+ }
+ operations.add(new ContainerReleaseOperation(id));
+ }
+ }
+ return operations;
+ }
+
+ /**
+ * Event handler for allocated containers: builds up the lists
+ * of assignment actions (what to run where), and possibly
+ * a list of operations to perform
+ * @param allocatedContainers the containers allocated
+ * @param assignments the assignments of roles to containers
+ * @param operations any allocation or release operations
+ */
+ public synchronized void onContainersAllocated(List<Container> allocatedContainers,
+ List<ContainerAssignment> assignments,
+ List<AbstractRMOperation> operations) {
+ assignments.clear();
+ operations.clear();
+ List<Container> ordered = roleHistory.prepareAllocationList(allocatedContainers);
+ log.debug("onContainersAllocated(): Total containers allocated = {}", ordered.size());
+ for (Container container : ordered) {
+ final NodeId nodeId = container.getNodeId();
+ String containerHostInfo = nodeId.getHost() + ":" + nodeId.getPort();
+ //get the role
+ final ContainerId cid = container.getId();
+ final RoleStatus role = lookupRoleStatus(container);
+
+ //dec requested count
+ role.decRequested();
+
+ //inc allocated count -this may need to be dropped in a moment,
+ // but us needed to update the logic below
+ final long allocated = role.incActual();
+ final long desired = role.getDesired();
+
+ final String roleName = role.getName();
+ final ContainerAllocationResults allocation =
+ roleHistory.onContainerAllocated(container, desired, allocated);
+ final ContainerAllocationOutcome outcome = allocation.outcome;
+
+ // add all requests to the operations list
+ operations.addAll(allocation.operations);
+
+ //look for condition where we get more back than we asked
+ if (allocated > desired) {
+ log.info("Discarding surplus {} container {} on {}", roleName, cid, containerHostInfo);
+ operations.add(new ContainerReleaseOperation(cid));
+ //register as a surplus node
+ surplusNodes.add(cid);
+ surplusContainers.inc();
+ //and, as we aren't binding it to role, dec that role's actual count
+ role.decActual();
+ } else {
+
+ // Allocation being accepted -so decrement the number of outstanding requests
+ decOutstandingContainerRequests();
+
+ log.info("Assigning role {} to container" +
+ " {}," +
+ " on {}:{},",
+ roleName,
+ cid,
+ nodeId.getHost(),
+ nodeId.getPort());
+
+ assignments.add(new ContainerAssignment(container, role, outcome));
+ //add to the history
+ roleHistory.onContainerAssigned(container);
+ // now for AA requests, add some more
+ if (role.isAntiAffinePlacement()) {
+ role.completeOutstandingAARequest();
+ // check invariants. The new node must become unavailable.
+ NodeInstance node = roleHistory.getOrCreateNodeInstance(container);
+ if (node.canHost(role.getKey(), role.getLabelExpression())) {
+ log.error("Assigned node still declares as available {}", node.toFullString() );
+ }
+ if (role.getPendingAntiAffineRequests() > 0) {
+ // still an outstanding AA request: need to issue a new one.
+ log.info("Asking for next container for AA role {}", roleName);
+ if (!addContainerRequest(operations, createAAContainerRequest(role))) {
+ log.info("No capacity in cluster for new requests");
+ } else {
+ role.decPendingAntiAffineRequests();
+ }
+ log.debug("Current AA role status {}", role);
+ } else {
+ log.info("AA request sequence completed for role {}", role);
+ }
+ }
+
+ }
+ }
+ }
+
+ /**
+ * Get diagnostics info about containers
+ */
+ public String getContainerDiagnosticInfo() {
+ StringBuilder builder = new StringBuilder();
+ for (RoleStatus roleStatus : getRoleStatusMap().values()) {
+ builder.append(roleStatus).append('\n');
+ }
+ return builder.toString();
+ }
+
+ /**
+ * Event handler for the list of active containers on restart.
+ * Sets the info key {@link StatusKeys#INFO_CONTAINERS_AM_RESTART}
+ * to the size of the list passed down (and does not set it if none were)
+ * @param liveContainers the containers allocated
+ * @return true if a rebuild took place (even if size 0)
+ * @throws RuntimeException on problems
+ */
+ private boolean rebuildModelFromRestart(List<Container> liveContainers)
+ throws BadClusterStateException {
+ if (liveContainers == null) {
+ return false;
+ }
+ for (Container container : liveContainers) {
+ addRestartedContainer(container);
+ }
+ clusterStatus.setInfo(StatusKeys.INFO_CONTAINERS_AM_RESTART,
+ Integer.toString(liveContainers.size()));
+ return true;
+ }
+
+ /**
+ * Add a restarted container by walking it through the create/submit/start
+ * lifecycle, so building up the internal structures
+ * @param container container that was running before the AM restarted
+ * @throws RuntimeException on problems
+ */
+ private void addRestartedContainer(Container container)
+ throws BadClusterStateException {
+ String containerHostInfo = container.getNodeId().getHost()
+ + ":" +
+ container.getNodeId().getPort();
+ // get the container ID
+ ContainerId cid = container.getId();
+
+ // get the role
+ int roleId = ContainerPriority.extractRole(container);
+ RoleStatus role =
+ lookupRoleStatus(roleId);
+ // increment
<TRUNCATED>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org