You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2017/07/25 18:02:49 UTC
[07/50] [abbrv] hadoop git commit: YARN-6255. Refactor
yarn-native-services framework. Contributed by Jian He
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index 6f54959..e891a27 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -18,12 +18,12 @@
package org.apache.slider.server.appmaster.state;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -31,42 +31,35 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.slider.api.ClusterDescription;
-import org.apache.slider.api.ClusterDescriptionKeys;
-import org.apache.slider.api.ClusterDescriptionOperations;
import org.apache.slider.api.ClusterNode;
import org.apache.slider.api.InternalKeys;
-import org.apache.slider.api.ResourceKeys;
import org.apache.slider.api.StatusKeys;
+import org.apache.slider.api.proto.Messages;
+import org.apache.slider.api.resource.Application;
+import org.apache.slider.api.resource.ApplicationState;
+import org.apache.slider.api.resource.Component;
import org.apache.slider.api.types.ApplicationLivenessInformation;
import org.apache.slider.api.types.ComponentInformation;
import org.apache.slider.api.types.RoleStatistics;
import org.apache.slider.common.SliderExitCodes;
import org.apache.slider.common.SliderKeys;
-import org.apache.slider.common.tools.ConfigHelper;
import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.core.conf.AggregateConf;
-import org.apache.slider.core.conf.ConfTree;
-import org.apache.slider.core.conf.ConfTreeOperations;
-import org.apache.slider.core.conf.MapOperations;
import org.apache.slider.core.exceptions.BadClusterStateException;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.core.exceptions.ErrorStrings;
import org.apache.slider.core.exceptions.NoSuchNodeException;
import org.apache.slider.core.exceptions.SliderInternalStateException;
import org.apache.slider.core.exceptions.TriggerClusterTeardownException;
-import org.apache.slider.core.persist.AggregateConfSerDeser;
-import org.apache.slider.core.persist.ConfTreeSerDeser;
import org.apache.slider.providers.PlacementPolicy;
import org.apache.slider.providers.ProviderRole;
-import org.apache.slider.server.appmaster.management.LongGauge;
import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
import org.apache.slider.server.appmaster.management.MetricsConstants;
+import org.apache.slider.server.appmaster.metrics.SliderMetrics;
import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation;
import org.apache.slider.server.appmaster.operations.ContainerRequestOperation;
@@ -77,7 +70,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -89,12 +81,10 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.hadoop.metrics2.lib.Interns.info;
import static org.apache.slider.api.ResourceKeys.*;
-import static org.apache.slider.api.RoleKeys.*;
import static org.apache.slider.api.StateValues.*;
-import static org.apache.slider.providers.docker.DockerKeys.DEFAULT_DOCKER_USE_PRIVILEGED;
-import static org.apache.slider.providers.docker.DockerKeys.DOCKER_IMAGE;
-import static org.apache.slider.providers.docker.DockerKeys.DOCKER_USE_PRIVILEGED;
+import static org.apache.slider.api.resource.ApplicationState.STARTED;
/**
* The model of all the ongoing state of a Slider AM.
@@ -117,53 +107,8 @@ public class AppState {
*/
private boolean applicationLive = false;
- /**
- * The definition of the instance. Flexing updates the resources section
- * This is used as a synchronization point on activities that update
- * the CD, and also to update some of the structures that
- * feed in to the CD
- */
- private AggregateConf instanceDefinition;
-
- /**
- * Time the instance definition snapshots were created
- */
- private long snapshotTime;
-
- /**
- * Snapshot of the instance definition. This is fully
- * resolved.
- */
- private AggregateConf instanceDefinitionSnapshot;
+ private Application app;
- /**
- * Snapshot of the raw instance definition; unresolved and
- * without any patch of an AM into it.
- */
- private AggregateConf unresolvedInstanceDefinition;
-
- /**
- * snapshot of resources as of last update time
- */
- private ConfTreeOperations resourcesSnapshot;
- private ConfTreeOperations appConfSnapshot;
- private ConfTreeOperations internalsSnapshot;
-
- /**
- * This is the status, the live model
- */
- private ClusterDescription clusterStatus = new ClusterDescription();
-
- /**
- * Metadata provided by the AM for use in filling in status requests
- */
- private Map<String, String> applicationInfo;
-
- /**
- * Client properties created via the provider -static for the life
- * of the application
- */
- private Map<String, String> clientProperties = new HashMap<>();
/**
* This is a template of the cluster status
@@ -180,11 +125,6 @@ public class AppState {
new ConcurrentSkipListMap<>();
/**
- * The master node.
- */
- private RoleInstance appMasterNode;
-
- /**
* Hash map of the containers we have. This includes things that have
* been allocated but are not live; it is a superset of the live list
*/
@@ -198,37 +138,6 @@ public class AppState {
*/
private final ConcurrentMap<ContainerId, Container> containersBeingReleased =
new ConcurrentHashMap<>();
-
- /**
- * Counter for completed containers ( complete denotes successful or failed )
- */
- private final LongGauge completedContainerCount = new LongGauge();
-
- /**
- * Count of failed containers
- */
- private final LongGauge failedContainerCount = new LongGauge();
-
- /**
- * # of started containers
- */
- private final LongGauge startedContainers = new LongGauge();
-
- /**
- * # of containers that failed to start
- */
- private final LongGauge startFailedContainerCount = new LongGauge();
-
- /**
- * Track the number of surplus containers received and discarded
- */
- private final LongGauge surplusContainers = new LongGauge();
-
- /**
- * Track the number of requested containers.
- * Important: this does not include AA requests which are yet to be issued.
- */
- private final LongGauge outstandingContainerRequests = new LongGauge();
/**
* Map of requested nodes. This records the command used to start it,
@@ -256,7 +165,7 @@ public class AppState {
* Nodes that came assigned to a role above that
* which were asked for -this appears to happen
*/
- private final Set<ContainerId> surplusNodes = new HashSet<>();
+ private final Set<ContainerId> surplusContainers = new HashSet<>();
/**
* Map of containerID to cluster nodes, for status reports.
@@ -269,7 +178,6 @@ public class AppState {
private final AtomicInteger completionOfUnknownContainerEvent =
new AtomicInteger();
-
/**
* limits of container core numbers in this queue
*/
@@ -298,6 +206,7 @@ public class AppState {
private Resource minResource;
private Resource maxResource;
+ private SliderMetrics appMetrics;
/**
* Create an instance
* @param recordFactory factory for YARN records
@@ -309,60 +218,6 @@ public class AppState {
Preconditions.checkArgument(metricsAndMonitoring != null, "null metricsAndMonitoring");
this.recordFactory = recordFactory;
this.metricsAndMonitoring = metricsAndMonitoring;
-
- // register any metrics
- register(MetricsConstants.CONTAINERS_OUTSTANDING_REQUESTS, outstandingContainerRequests);
- register(MetricsConstants.CONTAINERS_SURPLUS, surplusContainers);
- register(MetricsConstants.CONTAINERS_STARTED, startedContainers);
- register(MetricsConstants.CONTAINERS_COMPLETED, completedContainerCount);
- register(MetricsConstants.CONTAINERS_FAILED, failedContainerCount);
- register(MetricsConstants.CONTAINERS_START_FAILED, startFailedContainerCount);
- }
-
- private void register(String name, Metric counter) {
- this.metricsAndMonitoring.getMetrics().register(
- MetricRegistry.name(AppState.class, name), counter);
- }
-
- public long getFailedCountainerCount() {
- return failedContainerCount.getCount();
- }
-
- /**
- * Increment the count
- */
- public void incFailedCountainerCount() {
- failedContainerCount.inc();
- }
-
- public long getStartFailedCountainerCount() {
- return startFailedContainerCount.getCount();
- }
-
- /**
- * Increment the count and return the new value
- */
- public void incStartedCountainerCount() {
- startedContainers.inc();
- }
-
- public long getStartedCountainerCount() {
- return startedContainers.getCount();
- }
-
- /**
- * Increment the count and return the new value
- */
- public void incStartFailedCountainerCount() {
- startFailedContainerCount.inc();
- }
-
- public AtomicInteger getCompletionOfNodeNotInLiveListEvent() {
- return completionOfNodeNotInLiveListEvent;
- }
-
- public AtomicInteger getCompletionOfUnknownContainerEvent() {
- return completionOfUnknownContainerEvent;
}
@@ -370,13 +225,7 @@ public class AppState {
return roleStatusMap;
}
- protected Map<String, ProviderRole> getRoleMap() {
- return roles;
- }
- public Map<Integer, ProviderRole> getRolePriorityMap() {
- return rolePriorityMap;
- }
private Map<ContainerId, RoleInstance> getStartingContainers() {
return startingContainers;
@@ -396,47 +245,13 @@ public class AppState {
/**
* Get the current view of the cluster status.
- * <p>
- * Calls to {@link #refreshClusterStatus()} trigger a
- * refresh of this field.
- * <p>
* This is read-only
* to the extent that changes here do not trigger updates in the
* application state.
* @return the cluster status
*/
- public synchronized ClusterDescription getClusterStatus() {
- return clusterStatus;
- }
-
- @VisibleForTesting
- protected synchronized void setClusterStatus(ClusterDescription clusterDesc) {
- this.clusterStatus = clusterDesc;
- }
-
- /**
- * Set the instance definition -this also builds the (now obsolete)
- * cluster specification from it.
- *
- * Important: this is for early binding and must not be used after the build
- * operation is complete.
- * @param definition initial definition
- * @throws BadConfigException
- */
- public synchronized void setInitialInstanceDefinition(AggregateConf definition)
- throws BadConfigException, IOException {
- log.debug("Setting initial instance definition");
- // snapshot the definition
- AggregateConfSerDeser serDeser = new AggregateConfSerDeser();
-
- unresolvedInstanceDefinition = serDeser.fromInstance(definition);
-
- this.instanceDefinition = serDeser.fromInstance(definition);
- onInstanceDefinitionUpdated();
- }
-
- public synchronized AggregateConf getInstanceDefinition() {
- return instanceDefinition;
+ public synchronized Application getClusterStatus() {
+ return app;
}
/**
@@ -475,58 +290,27 @@ public class AppState {
maxResource = recordFactory.newResource(containerMaxMemory, containerMaxCores);
}
- public ConfTreeOperations getResourcesSnapshot() {
- return resourcesSnapshot;
- }
-
- public ConfTreeOperations getAppConfSnapshot() {
- return appConfSnapshot;
- }
-
- public ConfTreeOperations getInternalsSnapshot() {
- return internalsSnapshot;
- }
-
public boolean isApplicationLive() {
return applicationLive;
}
- public long getSnapshotTime() {
- return snapshotTime;
- }
-
- public synchronized AggregateConf getInstanceDefinitionSnapshot() {
- return instanceDefinitionSnapshot;
- }
-
- public AggregateConf getUnresolvedInstanceDefinition() {
- return unresolvedInstanceDefinition;
- }
public synchronized void buildInstance(AppStateBindingInfo binding)
throws BadClusterStateException, BadConfigException, IOException {
binding.validate();
log.debug("Building application state");
- publishedProviderConf = binding.publishedProviderConf;
- applicationInfo = binding.applicationInfo != null ? binding.applicationInfo
- : new HashMap<String, String>();
-
- clientProperties = new HashMap<>();
containerReleaseSelector = binding.releaseSelector;
-
- Set<String> confKeys = ConfigHelper.sortedConfigKeys(publishedProviderConf);
-
- // Add the -site configuration properties
- for (String key : confKeys) {
- String val = publishedProviderConf.get(key);
- clientProperties.put(key, val);
- }
-
// set the cluster specification (once its dependency the client properties
// is out the way
- setInitialInstanceDefinition(binding.instanceDefinition);
+ this.app = binding.application;
+ appMetrics = SliderMetrics.register(app.getName(),
+ "Metrics for service");
+ appMetrics
+ .tag("type", "Metrics type [component or service]", "service");
+ appMetrics
+ .tag("appId", "Application id for service", app.getId());
//build the initial role list
List<ProviderRole> roleList = new ArrayList<>(binding.roles);
@@ -534,51 +318,40 @@ public class AppState {
buildRole(providerRole);
}
- ConfTreeOperations resources = instanceDefinition.getResourceOperations();
-
- Set<String> roleNames = resources.getComponentNames();
- for (String name : roleNames) {
+ int priority = 1;
+ for (Component component : app.getComponents()) {
+ String name = component.getName();
if (roles.containsKey(name)) {
continue;
}
- if (hasUniqueNames(resources, name)) {
- log.info("Skipping group {}", name);
+ if (component.getUniqueComponentSupport()) {
+ log.info("Skipping group " + name + ", as it's unique component");
continue;
}
- // this is a new value
- log.info("Adding role {}", name);
- MapOperations resComponent = resources.getComponent(name);
- ProviderRole dynamicRole = createDynamicProviderRole(name, resComponent);
+ log.info("Adding component: " + name);
+ ProviderRole dynamicRole =
+ createComponent(name, name, component, priority++);
buildRole(dynamicRole);
roleList.add(dynamicRole);
}
//then pick up the requirements
buildRoleRequirementsFromResources();
- //set the livespan
- MapOperations globalResOpts = instanceDefinition.getResourceOperations().getGlobalOptions();
-
- startTimeThreshold = globalResOpts.getOptionInt(
- InternalKeys.INTERNAL_CONTAINER_FAILURE_SHORTLIFE,
- InternalKeys.DEFAULT_INTERNAL_CONTAINER_FAILURE_SHORTLIFE);
-
- failureThreshold = globalResOpts.getOptionInt(
- CONTAINER_FAILURE_THRESHOLD,
+ org.apache.slider.api.resource.Configuration conf = app.getConfiguration();
+ startTimeThreshold =
+ conf.getPropertyLong(InternalKeys.INTERNAL_CONTAINER_FAILURE_SHORTLIFE,
+ InternalKeys.DEFAULT_INTERNAL_CONTAINER_FAILURE_SHORTLIFE);
+ failureThreshold = (int) conf.getPropertyLong(CONTAINER_FAILURE_THRESHOLD,
DEFAULT_CONTAINER_FAILURE_THRESHOLD);
- nodeFailureThreshold = globalResOpts.getOptionInt(
- NODE_FAILURE_THRESHOLD,
+ nodeFailureThreshold = (int) conf.getPropertyLong(NODE_FAILURE_THRESHOLD,
DEFAULT_NODE_FAILURE_THRESHOLD);
- initClusterStatus();
-
// set up the role history
roleHistory = new RoleHistory(roleStatusMap.values(), recordFactory);
- roleHistory.register(metricsAndMonitoring);
roleHistory.onStart(binding.fs, binding.historyPath);
// trigger first node update
roleHistory.onNodesUpdated(binding.nodeReports);
-
//rebuild any live containers
rebuildModelFromRestart(binding.liveContainers);
@@ -586,180 +359,57 @@ public class AppState {
logServerURL = binding.serviceConfig.get(YarnConfiguration.YARN_LOG_SERVER_URL, "");
//mark as live
applicationLive = true;
- }
-
- public void initClusterStatus() {
- //copy into cluster status.
- ClusterDescription status = ClusterDescription.copy(clusterStatusTemplate);
- status.state = STATE_CREATED;
- MapOperations infoOps = new MapOperations("info", status.info);
- infoOps.mergeWithoutOverwrite(applicationInfo);
- SliderUtils.addBuildInfo(infoOps, "status");
-
- long now = now();
- status.setInfoTime(StatusKeys.INFO_LIVE_TIME_HUMAN,
- StatusKeys.INFO_LIVE_TIME_MILLIS,
- now);
- SliderUtils.setInfoTime(infoOps,
- StatusKeys.INFO_LIVE_TIME_HUMAN,
- StatusKeys.INFO_LIVE_TIME_MILLIS,
- now);
- if (0 == status.createTime) {
- status.createTime = now;
- SliderUtils.setInfoTime(infoOps,
- StatusKeys.INFO_CREATE_TIME_HUMAN,
- StatusKeys.INFO_CREATE_TIME_MILLIS,
- now);
- }
- status.state = STATE_LIVE;
-
- //set the app state to this status
- setClusterStatus(status);
- }
-
- /**
- * Build a dynamic provider role
- * @param name name of role
- * @return a new provider role
- * @throws BadConfigException bad configuration
- */
- public ProviderRole createDynamicProviderRole(String name, MapOperations component)
- throws BadConfigException {
- return createDynamicProviderRole(name, name, component);
- }
-
- /**
- * Build a dynamic provider role
- * @param name name of role
- * @param group group of role
- * @return a new provider role
- * @throws BadConfigException bad configuration
- */
- public ProviderRole createDynamicProviderRole(String name, String group, MapOperations component)
- throws BadConfigException {
- String priOpt = component.getMandatoryOption(COMPONENT_PRIORITY);
- int priority = SliderUtils.parseAndValidate(
- "value of " + name + " " + COMPONENT_PRIORITY, priOpt, 0, 1, -1);
-
- String placementOpt = component.getOption(COMPONENT_PLACEMENT_POLICY,
- Integer.toString(PlacementPolicy.DEFAULT));
-
- int placement = SliderUtils.parseAndValidate(
- "value of " + name + " " + COMPONENT_PLACEMENT_POLICY, placementOpt, 0, 0, -1);
-
- int placementTimeout = component.getOptionInt(PLACEMENT_ESCALATE_DELAY,
- DEFAULT_PLACEMENT_ESCALATE_DELAY_SECONDS);
-
- ProviderRole newRole = new ProviderRole(name,
- group,
- priority,
- placement,
- getNodeFailureThresholdForRole(group),
- placementTimeout,
- component.getOption(YARN_LABEL_EXPRESSION, DEF_YARN_LABEL_EXPRESSION));
- log.info("New {} ", newRole);
+ app.setState(STARTED);
+ }
+
+ //TODO WHY do we need to create the component for AM ?
+ public ProviderRole createComponent(String name, String group,
+ Component component, int priority) throws BadConfigException {
+
+ org.apache.slider.api.resource.Configuration conf =
+ component.getConfiguration();
+ long placementTimeout = conf.getPropertyLong(PLACEMENT_ESCALATE_DELAY,
+ DEFAULT_PLACEMENT_ESCALATE_DELAY_SECONDS);
+ long placementPolicy = conf.getPropertyLong(COMPONENT_PLACEMENT_POLICY,
+ PlacementPolicy.DEFAULT);
+ int threshold = (int) conf
+ .getPropertyLong(NODE_FAILURE_THRESHOLD, nodeFailureThreshold);
+ ProviderRole newRole =
+ new ProviderRole(name, group, priority, (int)placementPolicy, threshold,
+ placementTimeout, "", component);
+
+ log.info("Created a new role " + newRole);
return newRole;
}
- /**
- * Actions to perform when an instance definition is updated
- * Currently:
- * <ol>
- * <li>
- * resolve the configuration
- * </li>
- * <li>
- * update the cluster spec derivative
- * </li>
- * </ol>
- *
- * @throws BadConfigException
- */
- private synchronized void onInstanceDefinitionUpdated()
- throws BadConfigException, IOException {
-
- log.debug("Instance definition updated");
- //note the time
- snapshotTime = now();
-
- for (String component : instanceDefinition.getResourceOperations().getComponentNames()) {
- instanceDefinition.getAppConfOperations().getOrAddComponent(component);
- }
-
- // resolve references if not already done
- instanceDefinition.resolve();
-
- // force in the AM desired state values
- ConfTreeOperations resources = instanceDefinition.getResourceOperations();
-
- if (resources.getComponent(SliderKeys.COMPONENT_AM) != null) {
- resources.setComponentOpt(
- SliderKeys.COMPONENT_AM, COMPONENT_INSTANCES, "1");
- }
-
-
- //snapshot all three sectons
- resourcesSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getResources());
- appConfSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getAppConf());
- internalsSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getInternal());
- //build a new aggregate from the snapshots
- instanceDefinitionSnapshot = new AggregateConf(resourcesSnapshot.confTree,
- appConfSnapshot.confTree,
- internalsSnapshot.confTree);
- instanceDefinitionSnapshot.setName(instanceDefinition.getName());
-
- clusterStatusTemplate = ClusterDescriptionOperations.buildFromInstanceDefinition(
- instanceDefinition);
-
- // Add the -site configuration properties
- for (Map.Entry<String, String> prop : clientProperties.entrySet()) {
- clusterStatusTemplate.clientProperties.put(prop.getKey(), prop.getValue());
+ public synchronized void updateComponents(
+ Messages.FlexComponentRequestProto requestProto)
+ throws BadConfigException {
+ for (Component component : app.getComponents()) {
+ if (component.getName().equals(requestProto.getName())) {
+ component
+ .setNumberOfContainers((long) requestProto.getNumberOfContainers());
+ }
}
-
- }
-
- /**
- * The resource configuration is updated -review and update state.
- * @param resources updated resources specification
- * @return a list of any dynamically added provider roles
- * (purely for testing purposes)
- */
- @VisibleForTesting
- public synchronized List<ProviderRole> updateResourceDefinitions(ConfTree resources)
- throws BadConfigException, IOException {
- log.debug("Updating resources to {}", resources);
- // snapshot the (possibly unresolved) values
- ConfTreeSerDeser serDeser = new ConfTreeSerDeser();
- unresolvedInstanceDefinition.setResources(
- serDeser.fromInstance(resources));
- // assign another copy under the instance definition for resolving
- // and then driving application size
- instanceDefinition.setResources(serDeser.fromInstance(resources));
- onInstanceDefinitionUpdated();
-
- // propagate the role table
- Map<String, Map<String, String>> updated = resources.components;
- getClusterStatus().roles = SliderUtils.deepClone(updated);
- getClusterStatus().updateTime = now();
- return buildRoleRequirementsFromResources();
+ //TODO update cluster description
+ buildRoleRequirementsFromResources();
}
/**
* build the role requirements from the cluster specification
* @return a list of any dynamically added provider roles
*/
- private List<ProviderRole> buildRoleRequirementsFromResources() throws BadConfigException {
+ private List<ProviderRole> buildRoleRequirementsFromResources()
+ throws BadConfigException {
List<ProviderRole> newRoles = new ArrayList<>(0);
// now update every role's desired count.
// if there are no instance values, that role count goes to zero
-
- ConfTreeOperations resources =
- instanceDefinition.getResourceOperations();
-
// Add all the existing roles
+ // component name -> number of containers
Map<String, Integer> groupCounts = new HashMap<>();
+
for (RoleStatus roleStatus : getRoleStatusMap().values()) {
if (roleStatus.isExcludeFromFlexing()) {
// skip inflexible roles, e.g AM itself
@@ -768,10 +418,11 @@ public class AppState {
long currentDesired = roleStatus.getDesired();
String role = roleStatus.getName();
String roleGroup = roleStatus.getGroup();
- int desiredInstanceCount = getDesiredInstanceCount(resources, roleGroup);
+ Component component = roleStatus.getProviderRole().component;
+ int desiredInstanceCount = component.getNumberOfContainers().intValue();
int newDesired = desiredInstanceCount;
- if (hasUniqueNames(resources, roleGroup)) {
+ if (component.getUniqueComponentSupport()) {
Integer groupCount = 0;
if (groupCounts.containsKey(roleGroup)) {
groupCount = groupCounts.get(roleGroup);
@@ -793,56 +444,54 @@ public class AppState {
if (currentDesired != newDesired) {
log.info("Role {} flexed from {} to {}", role, currentDesired,
newDesired);
- roleStatus.setDesired(newDesired);
+ setDesiredContainers(roleStatus, newDesired);
}
}
// now the dynamic ones. Iterate through the the cluster spec and
// add any role status entries not in the role status
- Set<String> roleNames = resources.getComponentNames();
- for (String name : roleNames) {
+
+ List<RoleStatus> list = new ArrayList<>(getRoleStatusMap().values());
+ for (RoleStatus roleStatus : list) {
+ String name = roleStatus.getName();
+ Component component = roleStatus.getProviderRole().component;
if (roles.containsKey(name)) {
continue;
}
- if (hasUniqueNames(resources, name)) {
+ if (component.getUniqueComponentSupport()) {
// THIS NAME IS A GROUP
- int desiredInstanceCount = getDesiredInstanceCount(resources, name);
+ int desiredInstanceCount = component.getNumberOfContainers().intValue();
Integer groupCount = 0;
if (groupCounts.containsKey(name)) {
groupCount = groupCounts.get(name);
}
for (int i = groupCount + 1; i <= desiredInstanceCount; i++) {
- int priority = resources.getComponentOptInt(name, COMPONENT_PRIORITY, i);
+ int priority = roleStatus.getPriority();
// this is a new instance of an existing group
String newName = String.format("%s%d", name, i);
int newPriority = getNewPriority(priority + i - 1);
log.info("Adding new role {}", newName);
- MapOperations component = resources.getComponent(name,
- Collections.singletonMap(COMPONENT_PRIORITY,
- Integer.toString(newPriority)));
- if (component == null) {
- throw new BadConfigException("Component is null for name = " + name
- + ", newPriority =" + newPriority);
- }
- ProviderRole dynamicRole = createDynamicProviderRole(newName, name, component);
- RoleStatus roleStatus = buildRole(dynamicRole);
- roleStatus.setDesired(1);
- log.info("New role {}", roleStatus);
+ ProviderRole dynamicRole =
+ createComponent(newName, name, component, newPriority);
+ RoleStatus newRole = buildRole(dynamicRole);
+ incDesiredContainers(newRole);
+ log.info("New role {}", newRole);
if (roleHistory != null) {
- roleHistory.addNewRole(roleStatus);
+ roleHistory.addNewRole(newRole);
}
newRoles.add(dynamicRole);
}
} else {
// this is a new value
log.info("Adding new role {}", name);
- MapOperations component = resources.getComponent(name);
- ProviderRole dynamicRole = createDynamicProviderRole(name, component);
- RoleStatus roleStatus = buildRole(dynamicRole);
- roleStatus.setDesired(getDesiredInstanceCount(resources, name));
- log.info("New role {}", roleStatus);
+ ProviderRole dynamicRole =
+ createComponent(name, name, component, roleStatus.getPriority());
+ RoleStatus newRole = buildRole(dynamicRole);
+ incDesiredContainers(roleStatus,
+ component.getNumberOfContainers().intValue());
+ log.info("New role {}", newRole);
if (roleHistory != null) {
- roleHistory.addNewRole(roleStatus);
+ roleHistory.addNewRole(newRole);
}
newRoles.add(dynamicRole);
}
@@ -861,37 +510,6 @@ public class AppState {
}
/**
- * Get the desired instance count of a role, rejecting negative values
- * @param resources resource map
- * @param roleGroup role group
- * @return the instance count
- * @throws BadConfigException if the count is negative
- */
- private int getDesiredInstanceCount(ConfTreeOperations resources,
- String roleGroup) throws BadConfigException {
- int desiredInstanceCount =
- resources.getComponentOptInt(roleGroup, COMPONENT_INSTANCES, 0);
-
- if (desiredInstanceCount < 0) {
- log.error("Role {} has negative desired instances : {}", roleGroup,
- desiredInstanceCount);
- throw new BadConfigException(
- "Negative instance count (%) requested for component %s",
- desiredInstanceCount, roleGroup);
- }
- return desiredInstanceCount;
- }
-
- private Boolean hasUniqueNames(ConfTreeOperations resources, String group) {
- MapOperations component = resources.getComponent(group);
- if (component == null) {
- log.info("Component was null for {} when checking unique names", group);
- return Boolean.FALSE;
- }
- return component.getOptionBool(UNIQUE_NAMES, Boolean.FALSE);
- }
-
- /**
* Add knowledge of a role.
* This is a build-time operation that is not synchronized, and
* should be used while setting up the system state -before servicing
@@ -923,66 +541,9 @@ public class AppState {
*/
private void buildRoleResourceRequirements() {
for (RoleStatus role : roleStatusMap.values()) {
- role.setResourceRequirements(
- buildResourceRequirements(role, recordFactory.newResource()));
+ role.setResourceRequirements(buildResourceRequirements(role));
}
}
-
- /**
- * build up the special master node, which lives
- * in the live node set but has a lifecycle bonded to the AM
- * @param containerId the AM master
- * @param host hostname
- * @param amPort port
- * @param nodeHttpAddress http address: may be null
- */
- public void buildAppMasterNode(ContainerId containerId,
- String host,
- int amPort,
- String nodeHttpAddress) {
- Container container = new ContainerPBImpl();
- container.setId(containerId);
- NodeId nodeId = NodeId.newInstance(host, amPort);
- container.setNodeId(nodeId);
- container.setNodeHttpAddress(nodeHttpAddress);
- RoleInstance am = new RoleInstance(container);
- am.role = SliderKeys.COMPONENT_AM;
- am.group = SliderKeys.COMPONENT_AM;
- am.roleId = SliderKeys.ROLE_AM_PRIORITY_INDEX;
- am.createTime =now();
- am.startTime = am.createTime;
- appMasterNode = am;
- //it is also added to the set of live nodes
- getLiveContainers().put(containerId, am);
- putOwnedContainer(containerId, am);
-
- // patch up the role status
- RoleStatus roleStatus = roleStatusMap.get(SliderKeys.ROLE_AM_PRIORITY_INDEX);
- roleStatus.setDesired(1);
- roleStatus.incActual();
- roleStatus.incStarted();
- }
-
- /**
- * Note that the master node has been launched,
- * though it isn't considered live until any forked
- * processes are running. It is NOT registered with
- * the role history -the container is incomplete
- * and it will just cause confusion
- */
- public void noteAMLaunched() {
- getLiveContainers().put(appMasterNode.getContainerId(), appMasterNode);
- }
-
- /**
- * AM declares ourselves live in the cluster description.
- * This is meant to be triggered from the callback
- * indicating the spawned process is up and running.
- */
- public void noteAMLive() {
- appMasterNode.state = STATE_LIVE;
- }
-
/**
* Look up the status entry of a role or raise an exception
* @param key role ID
@@ -1008,24 +569,6 @@ public class AppState {
return lookupRoleStatus(ContainerPriority.extractRole(c));
}
- /**
- * Get a deep clone of the role status list. Concurrent events may mean this
- * list (or indeed, some of the role status entries) may be inconsistent
- * @return a snapshot of the role status entries
- */
- public List<RoleStatus> cloneRoleStatusList() {
- Collection<RoleStatus> statuses = roleStatusMap.values();
- List<RoleStatus> statusList = new ArrayList<>(statuses.size());
- try {
- for (RoleStatus status : statuses) {
- statusList.add((RoleStatus)(status.clone()));
- }
- } catch (CloneNotSupportedException e) {
- log.warn("Unexpected cloning failure: {}", e, e);
- }
- return statusList;
- }
-
/**
* Look up a role in the map
@@ -1278,8 +821,6 @@ public class AppState {
}
instance.released = true;
containersBeingReleased.put(id, instance.container);
- RoleStatus role = lookupRoleStatus(instance.roleId);
- role.incReleasing();
roleHistory.onContainerReleaseSubmitted(container);
}
@@ -1292,10 +833,10 @@ public class AppState {
* @return the container request to submit or null if there is none
*/
private AMRMClient.ContainerRequest createContainerRequest(RoleStatus role) {
+ incPendingContainers(role);
if (role.isAntiAffinePlacement()) {
return createAAContainerRequest(role);
} else {
- incrementRequestCount(role);
OutstandingRequest request = roleHistory.requestContainerForRole(role);
if (request != null) {
return request.getIssuedRequest();
@@ -1318,69 +859,69 @@ public class AppState {
if (request == null) {
return null;
}
- incrementRequestCount(role);
role.setOutstandingAArequest(request);
return request.getIssuedRequest();
}
- /**
- * Increment the request count of a role.
- * <p>
- * Also updates application state counters
- * @param role role being requested.
- */
- protected void incrementRequestCount(RoleStatus role) {
- role.incRequested();
- incOutstandingContainerRequests();
+ private void incPendingContainers(RoleStatus role) {
+ role.getComponentMetrics().containersPending.incr();
+ appMetrics.containersPending.incr();
}
- /**
- * Inc #of outstanding requests.
- */
- private void incOutstandingContainerRequests() {
- outstandingContainerRequests.inc();
+ private void decPendingContainers(RoleStatus role) {
+ decPendingContainers(role, 1);
}
- /**
- * Decrement the number of outstanding requests. This never goes below zero.
- */
- private void decOutstandingContainerRequests() {
- synchronized (outstandingContainerRequests) {
- if (outstandingContainerRequests.getCount() > 0) {
- // decrement but never go below zero
- outstandingContainerRequests.dec();
- }
- }
+ private void decPendingContainers(RoleStatus role, int n) {
+ role.getComponentMetrics().containersPending.decr(n);;
+ appMetrics.containersPending.decr(n);
}
- /**
- * Get the value of a YARN requirement (cores, RAM, etc).
- * These are returned as integers, but there is special handling of the
- * string {@link ResourceKeys#YARN_RESOURCE_MAX}, which triggers
- * the return of the maximum value.
- * @param group component to get from
- * @param option option name
- * @param defVal default value
- * @param maxVal value to return if the max val is requested
- * @return parsed value
- * @throws NumberFormatException if the role could not be parsed.
- */
- private int getResourceRequirement(ConfTreeOperations resources,
- String group,
- String option,
- int defVal,
- int maxVal) {
+ private void incRunningContainers(RoleStatus role) {
+ role.getComponentMetrics().containersRunning.incr();;
+ appMetrics.containersRunning.incr();
+ }
- String val = resources.getComponentOpt(group, option,
- Integer.toString(defVal));
- Integer intVal;
- if (YARN_RESOURCE_MAX.equals(val)) {
- intVal = maxVal;
- } else {
- intVal = Integer.decode(val);
+ private void decRunningContainers(RoleStatus role) {
+ role.getComponentMetrics().containersRunning.decr();;
+ appMetrics.containersRunning.decr();
+ }
+
+ private void setDesiredContainers(RoleStatus role, int n) {
+ role.getComponentMetrics().containersDesired.set(n);
+ appMetrics.containersDesired.set(n);
+ }
+
+ private void incDesiredContainers(RoleStatus role) {
+ role.getComponentMetrics().containersDesired.incr();
+ appMetrics.containersDesired.incr();
+ }
+
+ private void incDesiredContainers(RoleStatus role, int n) {
+ role.getComponentMetrics().containersDesired.incr(n);
+ appMetrics.containersDesired.incr(n);
+ }
+
+ private void incCompletedContainers(RoleStatus role) {
+ role.getComponentMetrics().containersCompleted.incr();
+ appMetrics.containersCompleted.incr();
+ }
+
+ private void incFailedContainers(RoleStatus role, ContainerOutcome outcome) {
+ role.getComponentMetrics().containersFailed.incr();
+ appMetrics.containersFailed.incr();
+ switch (outcome) {
+ case Preempted:
+ appMetrics.containersPreempted.incr();
+ role.getComponentMetrics().containersPreempted.incr();
+ break;
+ case Failed:
+ appMetrics.failedSinceLastThreshold.incr();
+ break;
+ default:
+ break;
}
- return intVal;
}
/**
@@ -1388,26 +929,28 @@ public class AppState {
* cluster specification, including substituing max allowed values
* if the specification asked for it.
* @param role role
- * @param capability capability to set up. A new one may be created
* during normalization
*/
- public Resource buildResourceRequirements(RoleStatus role, Resource capability) {
+ public Resource buildResourceRequirements(RoleStatus role) {
// Set up resource requirements from role values
String name = role.getName();
- String group = role.getGroup();
- ConfTreeOperations resources = getResourcesSnapshot();
- int cores = getResourceRequirement(resources,
- group,
- YARN_CORES,
- DEF_YARN_CORES,
- containerMaxCores);
- capability.setVirtualCores(cores);
- int ram = getResourceRequirement(resources, group,
- YARN_MEMORY,
- DEF_YARN_MEMORY,
- containerMaxMemory);
- capability.setMemory(ram);
- log.debug("Component {} has RAM={}, vCores ={}", name, ram, cores);
+ Component component = role.getProviderRole().component;
+ if (component == null) {
+ // this is for AM container
+ // TODO why do we need to create the component for AM ?
+ return Resource.newInstance(1, 512);
+ }
+ int cores = Math.min(containerMaxCores, component.getResource().getCpus());
+ if (cores <= 0) {
+ cores = DEF_YARN_CORES;
+ }
+ long mem = Math.min(containerMaxMemory,
+ Long.parseLong(component.getResource().getMemory()));
+ if (mem <= 0) {
+ mem = DEF_YARN_MEMORY;
+ }
+ Resource capability = Resource.newInstance(mem, cores);
+ log.debug("Component {} has RAM={}, vCores ={}", name, mem, cores);
Resource normalized = recordFactory.normalize(capability, minResource,
maxResource);
if (!Resources.equals(normalized, capability)) {
@@ -1459,7 +1002,6 @@ public class AppState {
*/
@VisibleForTesting
public RoleInstance innerOnNodeManagerContainerStarted(ContainerId containerId) {
- incStartedCountainerCount();
RoleInstance instance = getOwnedContainer(containerId);
if (instance == null) {
//serious problem
@@ -1477,8 +1019,6 @@ public class AppState {
"Container "+ containerId +" is already started");
}
instance.state = STATE_LIVE;
- RoleStatus roleStatus = lookupRoleStatus(instance.roleId);
- roleStatus.incStarted();
Container container = instance.container;
addLaunchedContainer(container, instance);
return instance;
@@ -1497,8 +1037,6 @@ public class AppState {
public synchronized void onNodeManagerContainerStartFailed(ContainerId containerId,
Throwable thrown) {
removeOwnedContainer(containerId);
- incFailedCountainerCount();
- incStartFailedCountainerCount();
RoleInstance instance = getStartingContainers().remove(containerId);
if (null != instance) {
RoleStatus roleStatus = lookupRoleStatus(instance.roleId);
@@ -1509,9 +1047,10 @@ public class AppState {
text = "container start failure";
}
instance.diagnostics = text;
- roleStatus.noteFailed(true, text, ContainerOutcome.Failed);
+ roleStatus.noteFailed(text);
getFailedContainers().put(containerId, instance);
roleHistory.onNodeManagerContainerStartFailed(instance.container);
+ incFailedContainers(roleStatus, ContainerOutcome.Failed);
}
}
@@ -1607,7 +1146,8 @@ public class AppState {
* @param status the node that has just completed
* @return NodeCompletionResult
*/
- public synchronized NodeCompletionResult onCompletedNode(ContainerStatus status) {
+ public synchronized NodeCompletionResult onCompletedContainer(
+ ContainerStatus status) {
ContainerId containerId = status.getContainerId();
NodeCompletionResult result = new NodeCompletionResult();
RoleInstance roleInstance;
@@ -1618,18 +1158,16 @@ public class AppState {
log.info("Container was queued for release : {}", containerId);
Container container = containersBeingReleased.remove(containerId);
RoleStatus roleStatus = lookupRoleStatus(container);
- long releasing = roleStatus.decReleasing();
- long actual = roleStatus.decActual();
- long completedCount = roleStatus.incCompleted();
- log.info("decrementing role count for role {} to {}; releasing={}, completed={}",
+ decRunningContainers(roleStatus);
+ incCompletedContainers(roleStatus);
+ log.info("decrementing role count for role {} to {}; completed={}",
roleStatus.getName(),
- actual,
- releasing,
- completedCount);
+ roleStatus.getComponentMetrics().containersRunning.value(),
+ roleStatus.getComponentMetrics().containersCompleted.value());
result.outcome = ContainerOutcome.Completed;
roleHistory.onReleaseCompleted(container);
- } else if (surplusNodes.remove(containerId)) {
+ } else if (surplusContainers.remove(containerId)) {
//its a surplus one being purged
result.surplusNode = true;
} else {
@@ -1640,8 +1178,8 @@ public class AppState {
roleInstance = removeOwnedContainer(containerId);
if (roleInstance != null) {
- //it was active, move it to failed
- incFailedCountainerCount();
+ RoleStatus roleStatus = lookupRoleStatus(roleInstance.roleId);
+ incFailedContainers(roleStatus, result.outcome);
failedContainers.put(containerId, roleInstance);
} else {
// the container may have been noted as failed already, so look
@@ -1653,8 +1191,8 @@ public class AppState {
String rolename = roleInstance.role;
log.info("Failed container in role[{}] : {}", roleId, rolename);
try {
- RoleStatus roleStatus = lookupRoleStatus(roleId);
- roleStatus.decActual();
+ RoleStatus roleStatus = lookupRoleStatus(roleInstance.roleId);
+ decRunningContainers(roleStatus);
boolean shortLived = isShortLived(roleInstance);
String message;
Container failedContainer = roleInstance.container;
@@ -1670,8 +1208,10 @@ public class AppState {
} else {
message = String.format("Failure %s (%d)", containerId, exitStatus);
}
- roleStatus.noteFailed(shortLived, message, result.outcome);
- long failed = roleStatus.getFailed();
+ roleStatus.noteFailed(message);
+ incFailedContainers(roleStatus, result.outcome);
+ long failed =
+ roleStatus.getComponentMetrics().containersFailed.value();
log.info("Current count of failed role[{}] {} = {}",
roleId, rolename, failed);
if (failedContainer != null) {
@@ -1761,7 +1301,7 @@ public class AppState {
float actual = 0;
for (RoleStatus role : getRoleStatusMap().values()) {
desired += role.getDesired();
- actual += role.getActual();
+ actual += role.getRunning();
}
if (desired == 0) {
percentage = 100;
@@ -1771,29 +1311,26 @@ public class AppState {
return percentage;
}
+
/**
* Update the cluster description with the current application state
*/
- public ClusterDescription refreshClusterStatus() {
- return refreshClusterStatus(null);
- }
+ public synchronized Application refreshClusterStatus() {
+
+ //TODO replace ClusterDescription with Application + related statistics
+ //TODO build container stats
+ app.setState(ApplicationState.STARTED);
+ return app;
+/*
+ return app;
- /**
- * Update the cluster description with the current application state
- * @param providerStatus status from the provider for the cluster info section
- */
- public synchronized ClusterDescription refreshClusterStatus(Map<String, String> providerStatus) {
ClusterDescription cd = getClusterStatus();
long now = now();
cd.setInfoTime(StatusKeys.INFO_STATUS_TIME_HUMAN,
StatusKeys.INFO_STATUS_TIME_MILLIS,
now);
- if (providerStatus != null) {
- for (Map.Entry<String, String> entry : providerStatus.entrySet()) {
- cd.setInfo(entry.getKey(), entry.getValue());
- }
- }
+
MapOperations infoOps = new MapOperations("info", cd.info);
infoOps.mergeWithoutOverwrite(applicationInfo);
SliderUtils.addBuildInfo(infoOps, "status");
@@ -1810,32 +1347,8 @@ public class AppState {
cd.status = new HashMap<>();
cd.status.put(ClusterDescriptionKeys.KEY_CLUSTER_LIVE, clusterNodes);
-
for (RoleStatus role : getRoleStatusMap().values()) {
String rolename = role.getName();
- if (hasUniqueNames(instanceDefinition.getResourceOperations(),
- role.getGroup())) {
- cd.setRoleOpt(rolename, COMPONENT_PRIORITY, role.getPriority());
- cd.setRoleOpt(rolename, ROLE_GROUP, role.getGroup());
- MapOperations groupOptions = instanceDefinition.getResourceOperations()
- .getComponent(role.getGroup());
- SliderUtils.mergeMapsIgnoreDuplicateKeys(cd.getRole(rolename),
- groupOptions.options);
- }
- String prefix = instanceDefinition.getAppConfOperations()
- .getComponentOpt(role.getGroup(), ROLE_PREFIX, null);
- if (SliderUtils.isSet(prefix)) {
- cd.setRoleOpt(rolename, ROLE_PREFIX, SliderUtils.trimPrefix(prefix));
- }
- String dockerImage = instanceDefinition.getAppConfOperations()
- .getComponentOpt(role.getGroup(), DOCKER_IMAGE, null);
- if (SliderUtils.isSet(dockerImage)) {
- cd.setRoleOpt(rolename, DOCKER_IMAGE, dockerImage);
- Boolean dockerUsePrivileged = instanceDefinition.getAppConfOperations()
- .getComponentOptBool(role.getGroup(), DOCKER_USE_PRIVILEGED,
- DEFAULT_DOCKER_USE_PRIVILEGED);
- cd.setRoleOpt(rolename, DOCKER_USE_PRIVILEGED, dockerUsePrivileged);
- }
List<String> instances = instanceMap.get(rolename);
int nodeCount = instances != null ? instances.size(): 0;
cd.setRoleOpt(rolename, COMPONENT_INSTANCES,
@@ -1861,7 +1374,7 @@ public class AppState {
// liveness
cd.liveness = getApplicationLivenessInformation();
- return cd;
+ return cd;*/
}
/**
@@ -1878,29 +1391,6 @@ public class AppState {
return li;
}
- /**
- * Get the live statistics map
- * @return a map of statistics values, defined in the {@link StatusKeys}
- * keylist.
- */
- protected Map<String, Integer> getLiveStatistics() {
- Map<String, Integer> sliderstats = new HashMap<>();
- sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_LIVE,
- liveNodes.size());
- sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_COMPLETED,
- completedContainerCount.intValue());
- sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_FAILED,
- failedContainerCount.intValue());
- sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_STARTED,
- startedContainers.intValue());
- sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_START_FAILED,
- startFailedContainerCount.intValue());
- sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_SURPLUS,
- surplusContainers.intValue());
- sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_UNKNOWN_COMPLETED,
- completionOfUnknownContainerEvent.get());
- return sliderstats;
- }
/**
* Get the aggregate statistics across all roles
@@ -1949,7 +1439,7 @@ public class AppState {
*/
public synchronized List<AbstractRMOperation> reviewRequestAndReleaseNodes()
throws SliderInternalStateException, TriggerClusterTeardownException {
- log.debug("in reviewRequestAndReleaseNodes()");
+ log.info("in reviewRequestAndReleaseNodes()");
List<AbstractRMOperation> allOperations = new ArrayList<>();
AbstractRMOperation blacklistOperation = updateBlacklist();
if (blacklistOperation != null) {
@@ -1981,15 +1471,11 @@ public class AppState {
if (failures > threshold) {
throw new TriggerClusterTeardownException(
- SliderExitCodes.EXIT_DEPLOYMENT_FAILED,
- FinalApplicationStatus.FAILED, ErrorStrings.E_UNSTABLE_CLUSTER +
- " - failed with component %s failed 'recently' %d times (%d in startup);" +
- " threshold is %d - last failure: %s",
- role.getName(),
- role.getFailed(),
- role.getStartFailed(),
- threshold,
- role.getFailureMessage());
+ SliderExitCodes.EXIT_DEPLOYMENT_FAILED, FinalApplicationStatus.FAILED,
+ ErrorStrings.E_UNSTABLE_CLUSTER
+ + " - failed with component %s failed 'recently' %d times;"
+ + " threshold is %d - last failure: %s", role.getName(),
+ role.getFailedRecently(), threshold, role.getFailureMessage());
}
}
@@ -2000,26 +1486,11 @@ public class AppState {
* @return the threshold for failures
*/
private int getFailureThresholdForRole(RoleStatus roleStatus) {
- ConfTreeOperations resources =
- instanceDefinition.getResourceOperations();
- return resources.getComponentOptInt(roleStatus.getGroup(),
- CONTAINER_FAILURE_THRESHOLD,
- failureThreshold);
+ return (int) roleStatus.getProviderRole().component.getConfiguration()
+ .getPropertyLong(CONTAINER_FAILURE_THRESHOLD,
+ DEFAULT_CONTAINER_FAILURE_THRESHOLD);
}
- /**
- * Get the node failure threshold for a specific role, falling back to
- * the global one if not
- * @param roleGroup role group
- * @return the threshold for failures
- */
- private int getNodeFailureThresholdForRole(String roleGroup) {
- ConfTreeOperations resources =
- instanceDefinition.getResourceOperations();
- return resources.getComponentOptInt(roleGroup,
- NODE_FAILURE_THRESHOLD,
- nodeFailureThreshold);
- }
/**
* Reset the "recent" failure counts of all roles
@@ -2027,9 +1498,9 @@ public class AppState {
public void resetFailureCounts() {
for (RoleStatus roleStatus : getRoleStatusMap().values()) {
long failed = roleStatus.resetFailedRecently();
- log.info("Resetting failure count of {}; was {}",
- roleStatus.getName(),
+ log.info("Resetting failure count of {}; was {}", roleStatus.getName(),
failed);
+
}
roleHistory.resetFailedRecently();
}
@@ -2075,6 +1546,7 @@ public class AppState {
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
private List<AbstractRMOperation> reviewOneRole(RoleStatus role)
throws SliderInternalStateException, TriggerClusterTeardownException {
+ log.info("review one role " + role.getName());
List<AbstractRMOperation> operations = new ArrayList<>();
long delta;
long expected;
@@ -2123,7 +1595,8 @@ public class AppState {
log.warn("Awaiting node map before generating anti-affinity requests");
}
log.info("Setting pending to {}", pending);
- role.setPendingAntiAffineRequests(pending);
+ //TODO
+ role.setAAPending((int)pending);
} else {
for (int i = 0; i < delta; i++) {
@@ -2139,7 +1612,7 @@ public class AppState {
long excess = -delta;
// how many requests are outstanding? for AA roles, this includes pending
- long outstandingRequests = role.getRequested() + role.getPendingAntiAffineRequests();
+ long outstandingRequests = role.getPending() + role.getAAPending();
if (outstandingRequests > 0) {
// outstanding requests.
int toCancel = (int)Math.min(outstandingRequests, excess);
@@ -2153,8 +1626,7 @@ public class AppState {
" expected to be able to cancel {} requests, but got {}",
toCancel, cancellations.size());
}
-
- role.cancel(toCancel);
+ decPendingContainers(role, toCancel);
excess -= toCancel;
assert excess >= 0 : "Attempted to cancel too many requests";
log.info("Submitted {} cancellations, leaving {} to release",
@@ -2215,9 +1687,9 @@ public class AppState {
} else {
// actual + requested == desired
// there's a special case here: clear all pending AA requests
- if (role.getPendingAntiAffineRequests() > 0) {
+ if (role.getAAPending() > 0) {
log.debug("Clearing outstanding pending AA requests");
- role.setPendingAntiAffineRequests(0);
+ role.setAAPending(0);
}
}
@@ -2269,28 +1741,6 @@ public class AppState {
}
/**
- * Find a container running on a specific host -looking
- * into the node ID to determine this.
- *
- * @param node node
- * @param roleId role the container must be in
- * @return a container or null if there are no containers on this host
- * that can be released.
- */
- private RoleInstance findRoleInstanceOnHost(NodeInstance node, int roleId) {
- Collection<RoleInstance> targets = cloneOwnedContainerList();
- String hostname = node.hostname;
- for (RoleInstance ri : targets) {
- if (hostname.equals(RoleHistoryUtils.hostnameOf(ri.container))
- && ri.roleId == roleId
- && containersBeingReleased.get(ri.getContainerId()) == null) {
- return ri;
- }
- }
- return null;
- }
-
- /**
* Release all containers.
* @return a list of operations to execute
*/
@@ -2329,26 +1779,25 @@ public class AppState {
* @param assignments the assignments of roles to containers
* @param operations any allocation or release operations
*/
- public synchronized void onContainersAllocated(List<Container> allocatedContainers,
- List<ContainerAssignment> assignments,
- List<AbstractRMOperation> operations) {
- assignments.clear();
- operations.clear();
+ public synchronized void onContainersAllocated(
+ List<Container> allocatedContainers,
+ List<ContainerAssignment> assignments,
+ List<AbstractRMOperation> operations) {
List<Container> ordered = roleHistory.prepareAllocationList(allocatedContainers);
- log.debug("onContainersAllocated(): Total containers allocated = {}", ordered.size());
+ log.info("onContainersAllocated(): Total containers allocated = {}", ordered.size());
for (Container container : ordered) {
final NodeId nodeId = container.getNodeId();
String containerHostInfo = nodeId.getHost() + ":" + nodeId.getPort();
//get the role
final ContainerId cid = container.getId();
final RoleStatus role = lookupRoleStatus(container);
-
- //dec requested count
- role.decRequested();
+ decPendingContainers(role);
//inc allocated count -this may need to be dropped in a moment,
// but us needed to update the logic below
- final long allocated = role.incActual();
+ MutableGaugeInt containersRunning = role.getComponentMetrics().containersRunning;
+ final long allocated = containersRunning.value();
+ incRunningContainers(role);
final long desired = role.getDesired();
final String roleName = role.getName();
@@ -2364,22 +1813,12 @@ public class AppState {
log.info("Discarding surplus {} container {} on {}", roleName, cid, containerHostInfo);
operations.add(new ContainerReleaseOperation(cid));
//register as a surplus node
- surplusNodes.add(cid);
- surplusContainers.inc();
- //and, as we aren't binding it to role, dec that role's actual count
- role.decActual();
+ surplusContainers.add(cid);
+ role.getComponentMetrics().surplusContainers.incr();
+ containersRunning.decr();
} else {
-
- // Allocation being accepted -so decrement the number of outstanding requests
- decOutstandingContainerRequests();
-
- log.info("Assigning role {} to container" +
- " {}," +
- " on {}:{},",
- roleName,
- cid,
- nodeId.getHost(),
- nodeId.getPort());
+ log.info("Assigning role {} to container" + " {}," + " on {}:{},",
+ roleName, cid, nodeId.getHost(), nodeId.getPort());
assignments.add(new ContainerAssignment(container, role, outcome));
//add to the history
@@ -2392,13 +1831,13 @@ public class AppState {
if (node.canHost(role.getKey(), role.getLabelExpression())) {
log.error("Assigned node still declares as available {}", node.toFullString() );
}
- if (role.getPendingAntiAffineRequests() > 0) {
+ if (role.getAAPending() > 0) {
// still an outstanding AA request: need to issue a new one.
log.info("Asking for next container for AA role {}", roleName);
if (!addContainerRequest(operations, createAAContainerRequest(role))) {
log.info("No capacity in cluster for new requests");
} else {
- role.decPendingAntiAffineRequests();
+ role.decAAPending();
}
log.debug("Current AA role status {}", role);
} else {
@@ -2437,8 +1876,7 @@ public class AppState {
for (Container container : liveContainers) {
addRestartedContainer(container);
}
- clusterStatus.setInfo(StatusKeys.INFO_CONTAINERS_AM_RESTART,
- Integer.toString(liveContainers.size()));
+ app.setNumberOfRunningContainers((long)liveContainers.size());
return true;
}
@@ -2458,10 +1896,9 @@ public class AppState {
// get the role
int roleId = ContainerPriority.extractRole(container);
- RoleStatus role =
- lookupRoleStatus(roleId);
+ RoleStatus role = lookupRoleStatus(roleId);
// increment its count
- role.incActual();
+ incRunningContainers(role);
String roleName = role.getName();
log.info("Rebuilding container {} in role {} on {},",
@@ -2495,12 +1932,6 @@ public class AppState {
final StringBuilder sb = new StringBuilder("AppState{");
sb.append("applicationLive=").append(applicationLive);
sb.append(", live nodes=").append(liveNodes.size());
- sb.append(", startedContainers=").append(startedContainers);
- sb.append(", startFailedContainerCount=").append(startFailedContainerCount);
- sb.append(", surplusContainers=").append(surplusContainers);
- sb.append(", failedContainerCount=").append(failedContainerCount);
- sb.append(", outstanding non-AA Container Requests=")
- .append(outstandingContainerRequests);
sb.append('}');
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java
index a8aa1a2..2dfded8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.api.resource.Application;
import org.apache.slider.providers.ProviderRole;
import java.util.ArrayList;
@@ -38,26 +38,24 @@ import java.util.Map;
* are added.
*/
public class AppStateBindingInfo {
- public AggregateConf instanceDefinition;
public Configuration serviceConfig = new Configuration();
- public Configuration publishedProviderConf = new Configuration(false);
+ public Application application = null;
public List<ProviderRole> roles = new ArrayList<>();
public FileSystem fs;
public Path historyPath;
public List<Container> liveContainers = new ArrayList<>(0);
- public Map<String, String> applicationInfo = new HashMap<>();
public ContainerReleaseSelector releaseSelector = new SimpleReleaseSelector();
/** node reports off the RM. */
public List<NodeReport> nodeReports = new ArrayList<>(0);
public void validate() throws IllegalArgumentException {
- Preconditions.checkArgument(instanceDefinition != null, "null instanceDefinition");
Preconditions.checkArgument(serviceConfig != null, "null appmasterConfig");
- Preconditions.checkArgument(publishedProviderConf != null, "null publishedProviderConf");
Preconditions.checkArgument(releaseSelector != null, "null releaseSelector");
Preconditions.checkArgument(roles != null, "null providerRoles");
Preconditions.checkArgument(fs != null, "null fs");
Preconditions.checkArgument(historyPath != null, "null historyDir");
Preconditions.checkArgument(nodeReports != null, "null nodeReports");
+ Preconditions.checkArgument(application != null, "null application");
+
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
index 37e9a7f..8046472 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
@@ -21,14 +21,12 @@ package org.apache.slider.server.appmaster.state;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.ClusterNode;
+import org.apache.slider.api.resource.Application;
import org.apache.slider.api.types.ApplicationLivenessInformation;
import org.apache.slider.api.types.ComponentInformation;
import org.apache.slider.api.types.NodeInformation;
import org.apache.slider.api.types.RoleStatistics;
-import org.apache.slider.core.conf.AggregateConf;
-import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.exceptions.NoSuchNodeException;
import org.apache.slider.core.registry.docstore.PublishedConfigSet;
import org.apache.slider.core.registry.docstore.PublishedExportsSet;
@@ -130,46 +128,16 @@ public class ProviderAppState implements StateAccessForProviders {
}
@Override
- public ClusterDescription getClusterStatus() {
+ public Application getApplication() {
return appState.getClusterStatus();
}
@Override
- public ConfTreeOperations getResourcesSnapshot() {
- return appState.getResourcesSnapshot();
- }
-
- @Override
- public ConfTreeOperations getAppConfSnapshot() {
- return appState.getAppConfSnapshot();
- }
-
- @Override
- public ConfTreeOperations getInternalsSnapshot() {
- return appState.getInternalsSnapshot();
- }
-
- @Override
public boolean isApplicationLive() {
return appState.isApplicationLive();
}
@Override
- public long getSnapshotTime() {
- return appState.getSnapshotTime();
- }
-
- @Override
- public AggregateConf getInstanceDefinitionSnapshot() {
- return appState.getInstanceDefinitionSnapshot();
- }
-
- @Override
- public AggregateConf getUnresolvedInstanceDefinition() {
- return appState.getUnresolvedInstanceDefinition();
- }
-
- @Override
public RoleStatus lookupRoleStatus(int key) {
return appState.lookupRoleStatus(key);
}
@@ -221,26 +189,16 @@ public class ProviderAppState implements StateAccessForProviders {
}
@Override
- public ClusterDescription refreshClusterStatus() {
+ public Application refreshClusterStatus() {
return appState.refreshClusterStatus();
}
@Override
- public List<RoleStatus> cloneRoleStatusList() {
- return appState.cloneRoleStatusList();
- }
-
- @Override
public ApplicationLivenessInformation getApplicationLivenessInformation() {
return appState.getApplicationLivenessInformation();
}
@Override
- public Map<String, Integer> getLiveStatistics() {
- return appState.getLiveStatistics();
- }
-
- @Override
public Map<String, ComponentInformation> getComponentInfoSnapshot() {
return appState.getComponentInfoSnapshot();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
index 38c70f3..b6c3675 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
@@ -135,17 +135,6 @@ public class RoleHistory {
outstandingRequests = new OutstandingRequestTracker();
}
- /**
- * Register all metrics with the metrics infra
- * @param metrics metrics
- */
- public void register(MetricsAndMonitoring metrics) {
- metrics.register(RoleHistory.class, dirty, "dirty");
- metrics.register(RoleHistory.class, nodesUpdatedTime, "nodes-updated.time");
- metrics.register(RoleHistory.class, nodeUpdateReceived, "nodes-updated.flag");
- metrics.register(RoleHistory.class, thawedDataTime, "thawed.time");
- metrics.register(RoleHistory.class, saveTime, "saved.time");
- }
/**
* safety check: make sure the role is unique amongst
@@ -1102,13 +1091,13 @@ public class RoleHistory {
int roleId = role.getKey();
List<OutstandingRequest> requests = new ArrayList<>(toCancel);
// there may be pending requests which can be cancelled here
- long pending = role.getPendingAntiAffineRequests();
+ long pending = role.getAAPending();
if (pending > 0) {
// there are some pending ones which can be cancelled first
long pendingToCancel = Math.min(pending, toCancel);
log.info("Cancelling {} pending AA allocations, leaving {}", toCancel,
pendingToCancel);
- role.setPendingAntiAffineRequests(pending - pendingToCancel);
+ role.setAAPending(pending - pendingToCancel);
toCancel -= pendingToCancel;
}
if (toCancel > 0 && role.isAARequestOutstanding()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java
index 30cfec9..de52f4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java
@@ -29,6 +29,7 @@ import org.apache.slider.api.ClusterNode;
import org.apache.slider.api.proto.Messages;
import org.apache.slider.api.types.ContainerInformation;
import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.providers.ProviderRole;
import java.util.ArrayList;
import java.util.Arrays;
@@ -40,6 +41,7 @@ import java.util.List;
public final class RoleInstance implements Cloneable {
public Container container;
+ public ProviderRole providerRole;
/**
* Container ID
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org