You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/10/11 20:37:18 UTC
[14/50] [abbrv] hadoop git commit: YARN-5461. Initial code ported
from slider-core module. (jianhe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
new file mode 100644
index 0000000..37e9a7f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.api.ClusterNode;
+import org.apache.slider.api.types.ApplicationLivenessInformation;
+import org.apache.slider.api.types.ComponentInformation;
+import org.apache.slider.api.types.NodeInformation;
+import org.apache.slider.api.types.RoleStatistics;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.exceptions.NoSuchNodeException;
+import org.apache.slider.core.registry.docstore.PublishedConfigSet;
+import org.apache.slider.core.registry.docstore.PublishedExportsSet;
+import org.apache.slider.server.appmaster.web.rest.RestPaths;
+import org.apache.slider.server.services.utility.PatternValidator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of {@link StateAccessForProviders}, which means
+ * state access for providers, web UI and IPC/REST views.
+ */
+public class ProviderAppState implements StateAccessForProviders {
+
+
+ private final Map<String, PublishedConfigSet> publishedConfigSets =
+ new ConcurrentHashMap<>(5);
+ private final PublishedExportsSet publishedExportsSets = new PublishedExportsSet();
+ private static final PatternValidator validator = new PatternValidator(
+ RestPaths.PUBLISHED_CONFIGURATION_SET_REGEXP);
+ private String applicationName;
+
+ private final AppState appState;
+
+ public ProviderAppState(String applicationName, AppState appState) {
+ this.appState = appState;
+ this.applicationName = applicationName;
+ }
+
+ public void setApplicationName(String applicationName) {
+ this.applicationName = applicationName;
+ }
+
+ @Override
+ public String getApplicationName() {
+ return applicationName;
+ }
+
+ @Override
+ public PublishedConfigSet getPublishedSliderConfigurations() {
+ return getOrCreatePublishedConfigSet(RestPaths.SLIDER_CONFIGSET);
+ }
+
+ @Override
+ public PublishedExportsSet getPublishedExportsSet() {
+ return publishedExportsSets;
+ }
+
+ @Override
+ public PublishedConfigSet getPublishedConfigSet(String name) {
+ return publishedConfigSets.get(name);
+ }
+
+ @Override
+ public PublishedConfigSet getOrCreatePublishedConfigSet(String name) {
+ PublishedConfigSet set = publishedConfigSets.get(name);
+ if (set == null) {
+ validator.validate(name);
+ synchronized (publishedConfigSets) {
+ // synchronized double check to ensure that there is never an overridden
+ // config set created
+ set = publishedConfigSets.get(name);
+ if (set == null) {
+ set = new PublishedConfigSet();
+ publishedConfigSets.put(name, set);
+ }
+ }
+ }
+ return set;
+ }
+
+ @Override
+ public List<String> listConfigSets() {
+
+ synchronized (publishedConfigSets) {
+ List<String> sets = new ArrayList<>(publishedConfigSets.keySet());
+ return sets;
+ }
+ }
+
+ @Override
+ public Map<Integer, RoleStatus> getRoleStatusMap() {
+ return appState.getRoleStatusMap();
+ }
+
+
+ @Override
+ public Map<ContainerId, RoleInstance> getFailedContainers() {
+ return appState.getFailedContainers();
+ }
+
+ @Override
+ public Map<ContainerId, RoleInstance> getLiveContainers() {
+ return appState.getLiveContainers();
+ }
+
+ @Override
+ public ClusterDescription getClusterStatus() {
+ return appState.getClusterStatus();
+ }
+
+ @Override
+ public ConfTreeOperations getResourcesSnapshot() {
+ return appState.getResourcesSnapshot();
+ }
+
+ @Override
+ public ConfTreeOperations getAppConfSnapshot() {
+ return appState.getAppConfSnapshot();
+ }
+
+ @Override
+ public ConfTreeOperations getInternalsSnapshot() {
+ return appState.getInternalsSnapshot();
+ }
+
+ @Override
+ public boolean isApplicationLive() {
+ return appState.isApplicationLive();
+ }
+
+ @Override
+ public long getSnapshotTime() {
+ return appState.getSnapshotTime();
+ }
+
+ @Override
+ public AggregateConf getInstanceDefinitionSnapshot() {
+ return appState.getInstanceDefinitionSnapshot();
+ }
+
+ @Override
+ public AggregateConf getUnresolvedInstanceDefinition() {
+ return appState.getUnresolvedInstanceDefinition();
+ }
+
+ @Override
+ public RoleStatus lookupRoleStatus(int key) {
+ return appState.lookupRoleStatus(key);
+ }
+
+ @Override
+ public RoleStatus lookupRoleStatus(Container c) throws YarnRuntimeException {
+ return appState.lookupRoleStatus(c);
+ }
+
+ @Override
+ public RoleStatus lookupRoleStatus(String name) throws YarnRuntimeException {
+ return appState.lookupRoleStatus(name);
+ }
+
+ @Override
+ public List<RoleInstance> cloneOwnedContainerList() {
+ return appState.cloneOwnedContainerList();
+ }
+
+ @Override
+ public int getNumOwnedContainers() {
+ return appState.getNumOwnedContainers();
+ }
+
+ @Override
+ public RoleInstance getOwnedContainer(ContainerId id) {
+ return appState.getOwnedContainer(id);
+ }
+
+ @Override
+ public RoleInstance getOwnedContainer(String id) throws NoSuchNodeException {
+ return appState.getOwnedInstanceByContainerID(id);
+ }
+
+ @Override
+ public List<RoleInstance> cloneLiveContainerInfoList() {
+ return appState.cloneLiveContainerInfoList();
+ }
+
+ @Override
+ public RoleInstance getLiveInstanceByContainerID(String containerId) throws
+ NoSuchNodeException {
+ return appState.getLiveInstanceByContainerID(containerId);
+ }
+
+ @Override
+ public List<RoleInstance> getLiveInstancesByContainerIDs(Collection<String> containerIDs) {
+ return appState.getLiveInstancesByContainerIDs(containerIDs);
+ }
+
+ @Override
+ public ClusterDescription refreshClusterStatus() {
+ return appState.refreshClusterStatus();
+ }
+
+ @Override
+ public List<RoleStatus> cloneRoleStatusList() {
+ return appState.cloneRoleStatusList();
+ }
+
+ @Override
+ public ApplicationLivenessInformation getApplicationLivenessInformation() {
+ return appState.getApplicationLivenessInformation();
+ }
+
+ @Override
+ public Map<String, Integer> getLiveStatistics() {
+ return appState.getLiveStatistics();
+ }
+
+ @Override
+ public Map<String, ComponentInformation> getComponentInfoSnapshot() {
+ return appState.getComponentInfoSnapshot();
+ }
+
+ @Override
+ public Map<String, Map<String, ClusterNode>> getRoleClusterNodeMapping() {
+ return appState.createRoleToClusterNodeMap();
+ }
+
+ @Override
+ public List<RoleInstance> enumLiveInstancesInRole(String role) {
+ List<RoleInstance> nodes = new ArrayList<>();
+ Collection<RoleInstance> allRoleInstances = cloneLiveContainerInfoList();
+ for (RoleInstance node : allRoleInstances) {
+ if (role.isEmpty() || role.equals(node.role)) {
+ nodes.add(node);
+ }
+ }
+ return nodes;
+ }
+
+ @Override
+ public List<RoleInstance> lookupRoleContainers(String component) {
+ RoleStatus roleStatus = lookupRoleStatus(component);
+ List<RoleInstance> ownedContainerList = cloneOwnedContainerList();
+ List<RoleInstance> matching = new ArrayList<>(ownedContainerList.size());
+ int roleId = roleStatus.getPriority();
+ for (RoleInstance instance : ownedContainerList) {
+ if (instance.roleId == roleId) {
+ matching.add(instance);
+ }
+ }
+ return matching;
+ }
+
+ @Override
+ public ComponentInformation getComponentInformation(String component) {
+ RoleStatus roleStatus = lookupRoleStatus(component);
+ ComponentInformation info = roleStatus.serialize();
+ List<RoleInstance> containers = lookupRoleContainers(component);
+ info.containers = new ArrayList<>(containers.size());
+ for (RoleInstance container : containers) {
+ info.containers.add(container.id);
+ }
+ return info;
+ }
+
+ @Override
+ public Map<String, NodeInformation> getNodeInformationSnapshot() {
+ return appState.getRoleHistory()
+ .getNodeInformationSnapshot(appState.buildNamingMap());
+ }
+
+ @Override
+ public NodeInformation getNodeInformation(String hostname) {
+ return appState.getRoleHistory()
+ .getNodeInformation(hostname, appState.buildNamingMap());
+ }
+
+ @Override
+ public RoleStatistics getRoleStatistics() {
+ return appState.getRoleStatistics();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
new file mode 100644
index 0000000..4e8a4d7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
@@ -0,0 +1,1101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.slider.api.types.NodeInformation;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.providers.ProviderRole;
+import org.apache.slider.server.appmaster.management.BoolMetric;
+import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
+import org.apache.slider.server.appmaster.management.Timestamp;
+import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
+import org.apache.slider.server.avro.LoadedRoleHistory;
+import org.apache.slider.server.avro.NodeEntryRecord;
+import org.apache.slider.server.avro.RoleHistoryHeader;
+import org.apache.slider.server.avro.RoleHistoryWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The Role History.
+ * <p>
+ * Synchronization policy: all public operations are synchronized.
+ * Protected methods are in place for testing -no guarantees are made.
+ * <p>
+ * Inner classes have no synchronization guarantees; they should be manipulated
+ * in these classes and not externally.
+ * <p>
+ * Note that as well as some methods marked visible for testing, there
+ * is the option for the time generator method, {@link #now()} to
+ * be overridden so that a repeatable time series can be used.
+ *
+ */
+public class RoleHistory {
+ protected static final Logger log =
+ LoggerFactory.getLogger(RoleHistory.class);
+ private final List<ProviderRole> providerRoles;
+ /** the roles in here are shared with App State */
+ private final Map<Integer, RoleStatus> roleStatusMap = new HashMap<>();
+ private final AbstractClusterServices recordFactory;
+
+ private long startTime;
+
+ /** Time when saved */
+ private final Timestamp saveTime = new Timestamp(0);
+
+ /** If the history was loaded, the time at which the history was saved.
+ * That is: the time the data was valid */
+ private final Timestamp thawedDataTime = new Timestamp(0);
+
+ private NodeMap nodemap;
+ private int roleSize;
+ private final BoolMetric dirty = new BoolMetric(false);
+ private FileSystem filesystem;
+ private Path historyPath;
+ private RoleHistoryWriter historyWriter = new RoleHistoryWriter();
+
+ /**
+ * When were the nodes updated in a {@link #onNodesUpdated(List)} call?
+ * If zero: never.
+ */
+ private final Timestamp nodesUpdatedTime = new Timestamp(0);
+ private final BoolMetric nodeUpdateReceived = new BoolMetric(false);
+
+ private OutstandingRequestTracker outstandingRequests =
+ new OutstandingRequestTracker();
+
+ /**
+ * For each role, lists nodes that are available for data-local allocation,
+ * ordered by more recently released - to accelerate node selection.
+ * That is, they are "recently used nodes"
+ */
+ private Map<Integer, LinkedList<NodeInstance>> recentNodes;
+
+ /**
+ * Instantiate
+ * @param roles initial role list
+ * @param recordFactory yarn record factory
+ * @throws BadConfigException
+ */
+ public RoleHistory(Collection<RoleStatus> roles, AbstractClusterServices recordFactory) throws BadConfigException {
+ this.recordFactory = recordFactory;
+ roleSize = roles.size();
+ providerRoles = new ArrayList<>(roleSize);
+ for (RoleStatus role : roles) {
+ addNewRole(role);
+ }
+ reset();
+ }
+
+ /**
+ * Reset the variables -this does not adjust the fixed attributes
+ * of the history, but the nodemap and failed node map are cleared.
+ */
+ protected synchronized void reset() throws BadConfigException {
+
+ nodemap = new NodeMap(roleSize);
+ resetAvailableNodeLists();
+ outstandingRequests = new OutstandingRequestTracker();
+ }
+
+ /**
+ * Register all metrics with the metrics infra
+ * @param metrics metrics
+ */
+ public void register(MetricsAndMonitoring metrics) {
+ metrics.register(RoleHistory.class, dirty, "dirty");
+ metrics.register(RoleHistory.class, nodesUpdatedTime, "nodes-updated.time");
+ metrics.register(RoleHistory.class, nodeUpdateReceived, "nodes-updated.flag");
+ metrics.register(RoleHistory.class, thawedDataTime, "thawed.time");
+ metrics.register(RoleHistory.class, saveTime, "saved.time");
+ }
+
+ /**
+ * safety check: make sure the role is unique amongst
+ * the role stats...which is extended with the new role
+ * @param roleStatus role
+ * @throws ArrayIndexOutOfBoundsException
+ * @throws BadConfigException
+ */
+ protected void putRole(RoleStatus roleStatus) throws BadConfigException {
+ int index = roleStatus.getKey();
+ if (index < 0) {
+ throw new BadConfigException("Provider " + roleStatus + " id is out of range");
+ }
+ if (roleStatusMap.get(index) != null) {
+ throw new BadConfigException(
+ roleStatus.toString() + " id duplicates that of " +
+ roleStatusMap.get(index));
+ }
+ roleStatusMap.put(index, roleStatus);
+ }
+
+ /**
+ * Add a new role
+ * @param roleStatus new role
+ */
+ public void addNewRole(RoleStatus roleStatus) throws BadConfigException {
+ log.debug("Validating/adding new role to role history: {} ", roleStatus);
+ putRole(roleStatus);
+ this.providerRoles.add(roleStatus.getProviderRole());
+ }
+
+ /**
+ * Lookup a role by ID
+ * @param roleId role Id
+ * @return role or null if not found
+ */
+ public ProviderRole lookupRole(int roleId) {
+ for (ProviderRole role : providerRoles) {
+ if (role.id == roleId) {
+ return role;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Clear the lists of available nodes
+ */
+ private synchronized void resetAvailableNodeLists() {
+ recentNodes = new ConcurrentHashMap<>(roleSize);
+ }
+
+ /**
+ * Prepare the history for re-reading its state.
+ * <p>
+ * This intended for use by the RoleWriter logic.
+ * @throws BadConfigException if there is a problem rebuilding the state
+ */
+ private void prepareForReading(RoleHistoryHeader header)
+ throws BadConfigException {
+ reset();
+
+ int roleCountInSource = header.getRoles();
+ if (roleCountInSource != roleSize) {
+ log.warn("Number of roles in source {}"
+ +" does not match the expected number of {}",
+ roleCountInSource,
+ roleSize);
+ }
+ //record when the data was loaded
+ setThawedDataTime(header.getSaved());
+ }
+
+ /**
+ * rebuild the placement history from the loaded role history
+ * @param loadedRoleHistory loaded history
+ * @return the number of entries discarded
+ * @throws BadConfigException if there is a problem rebuilding the state
+ */
+ @VisibleForTesting
+ public synchronized int rebuild(LoadedRoleHistory loadedRoleHistory) throws BadConfigException {
+ RoleHistoryHeader header = loadedRoleHistory.getHeader();
+ prepareForReading(header);
+ int discarded = 0;
+ Long saved = header.getSaved();
+ for (NodeEntryRecord nodeEntryRecord : loadedRoleHistory.records) {
+ Integer roleId = nodeEntryRecord.getRole();
+ NodeEntry nodeEntry = new NodeEntry(roleId);
+ nodeEntry.setLastUsed(nodeEntryRecord.getLastUsed());
+ if (nodeEntryRecord.getActive()) {
+ //if active at the time of save, make the last used time the save time
+ nodeEntry.setLastUsed(saved);
+ }
+ String hostname = SliderUtils.sequenceToString(nodeEntryRecord.getHost());
+ ProviderRole providerRole = lookupRole(roleId);
+ if (providerRole == null) {
+ // discarding entry
+ log.info("Discarding history entry with unknown role: {} on host {}",
+ roleId, hostname);
+ discarded ++;
+ } else {
+ NodeInstance instance = getOrCreateNodeInstance(hostname);
+ instance.set(roleId, nodeEntry);
+ }
+ }
+ return discarded;
+ }
+
+ public synchronized long getStartTime() {
+ return startTime;
+ }
+
+ public synchronized long getSaveTime() {
+ return saveTime.get();
+ }
+
+ public long getThawedDataTime() {
+ return thawedDataTime.get();
+ }
+
+ public void setThawedDataTime(long thawedDataTime) {
+ this.thawedDataTime.set(thawedDataTime);
+ }
+
+ public synchronized int getRoleSize() {
+ return roleSize;
+ }
+
+ /**
+ * Get the total size of the cluster -the number of NodeInstances
+ * @return a count
+ */
+ public synchronized int getClusterSize() {
+ return nodemap.size();
+ }
+
+ public synchronized boolean isDirty() {
+ return dirty.get();
+ }
+
+ public synchronized void setDirty(boolean dirty) {
+ this.dirty.set(dirty);
+ }
+
+ /**
+ * Tell the history that it has been saved; marks itself as clean
+ * @param timestamp timestamp -updates the savetime field
+ */
+ public synchronized void saved(long timestamp) {
+ setDirty(false);
+ saveTime.set(timestamp);
+ }
+
+ /**
+ * Get a clone of the nodemap.
+ * The instances inside are not cloned
+ * @return the map
+ */
+ public synchronized NodeMap cloneNodemap() {
+ return (NodeMap) nodemap.clone();
+ }
+
+ /**
+ * Get snapshot of the node map
+ * @return a snapshot of the current node state
+ * @param naming naming map of priority to enty name; entries must be unique.
+ * It's OK to be incomplete, for those the list falls back to numbers.
+ */
+ public synchronized Map<String, NodeInformation> getNodeInformationSnapshot(
+ Map<Integer, String> naming) {
+ Map<String, NodeInformation> result = new HashMap<>(nodemap.size());
+ for (Map.Entry<String, NodeInstance> entry : nodemap.entrySet()) {
+ result.put(entry.getKey(), entry.getValue().serialize(naming));
+ }
+ return result;
+ }
+
+ /**
+ * Get the information on a node
+ * @param hostname hostname
+ * @param naming naming map of priority to enty name; entries must be unique.
+ * It's OK to be incomplete, for those the list falls back to numbers.
+ * @return the information about that host, or null if there is none
+ */
+ public synchronized NodeInformation getNodeInformation(String hostname,
+ Map<Integer, String> naming) {
+ NodeInstance nodeInstance = nodemap.get(hostname);
+ return nodeInstance != null ? nodeInstance.serialize(naming) : null;
+ }
+
+ /**
+ * Get the node instance for the specific node -creating it if needed
+ * @param hostname node address
+ * @return the instance
+ */
+ public synchronized NodeInstance getOrCreateNodeInstance(String hostname) {
+ //convert to a string
+ return nodemap.getOrCreate(hostname);
+ }
+
+ /**
+ * Insert a list of nodes into the map; overwrite any with that name.
+ * This is a bulk operation for testing.
+ * Important: this does not update the available node lists, these
+ * must be rebuilt afterwards.
+ * @param nodes collection of nodes.
+ */
+ @VisibleForTesting
+ public synchronized void insert(Collection<NodeInstance> nodes) {
+ nodemap.insert(nodes);
+ }
+
+ /**
+ * Get current time. overrideable for test subclasses
+ * @return current time in millis
+ */
+ protected long now() {
+ return System.currentTimeMillis();
+ }
+
+ /**
+ * Mark ourselves as dirty
+ */
+ public void touch() {
+ setDirty(true);
+ try {
+ saveHistoryIfDirty();
+ } catch (IOException e) {
+ log.warn("Failed to save history file ", e);
+ }
+ }
+
+ /**
+ * reset the failed recently counters
+ */
+ public synchronized void resetFailedRecently() {
+ log.info("Resetting failure history");
+ nodemap.resetFailedRecently();
+ }
+
+ /**
+ * Get the path used for history files
+ * @return the directory used for history files
+ */
+ public Path getHistoryPath() {
+ return historyPath;
+ }
+
+ /**
+ * Save the history to its location using the timestamp as part of
+ * the filename. The saveTime and dirty fields are updated
+ * @param time timestamp timestamp to use as the save time
+ * @return the path saved to
+ * @throws IOException IO problems
+ */
+ @VisibleForTesting
+ public synchronized Path saveHistory(long time) throws IOException {
+ Path filename = historyWriter.createHistoryFilename(historyPath, time);
+ historyWriter.write(filesystem, filename, true, this, time);
+ saved(time);
+ return filename;
+ }
+
+ /**
+ * Save the history with the current timestamp if it is dirty;
+ * return the path saved to if this is the case
+ * @return the path or null if the history was not saved
+ * @throws IOException failed to save for some reason
+ */
+ public synchronized Path saveHistoryIfDirty() throws IOException {
+ if (isDirty()) {
+ return saveHistory(now());
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Start up
+ * @param fs filesystem
+ * @param historyDir path in FS for history
+ * @return true if the history was thawed
+ */
+ public boolean onStart(FileSystem fs, Path historyDir) throws BadConfigException {
+ assert filesystem == null;
+ filesystem = fs;
+ historyPath = historyDir;
+ startTime = now();
+ //assume the history is being thawed; this will downgrade as appropriate
+ return onThaw();
+ }
+
+ /**
+ * Handler for bootstrap event: there was no history to thaw
+ */
+ public void onBootstrap() {
+ log.debug("Role history bootstrapped");
+ }
+
+ /**
+ * Handle the start process <i>after the history has been rebuilt</i>,
+ * and after any gc/purge
+ */
+ public synchronized boolean onThaw() throws BadConfigException {
+ assert filesystem != null;
+ assert historyPath != null;
+ boolean thawSuccessful = false;
+ //load in files from data dir
+
+ LoadedRoleHistory loadedRoleHistory = null;
+ try {
+ loadedRoleHistory = historyWriter.loadFromHistoryDir(filesystem, historyPath);
+ } catch (IOException e) {
+ log.warn("Exception trying to load history from {}", historyPath, e);
+ }
+ if (loadedRoleHistory != null) {
+ rebuild(loadedRoleHistory);
+ thawSuccessful = true;
+ Path loadPath = loadedRoleHistory.getPath();
+ log.debug("loaded history from {}", loadPath);
+ // delete any old entries
+ try {
+ int count = historyWriter.purgeOlderHistoryEntries(filesystem, loadPath);
+ log.debug("Deleted {} old history entries", count);
+ } catch (IOException e) {
+ log.info("Ignoring exception raised while trying to delete old entries",
+ e);
+ }
+
+ //start is then completed
+ buildRecentNodeLists();
+ } else {
+ //fallback to bootstrap procedure
+ onBootstrap();
+ }
+ return thawSuccessful;
+ }
+
+
+ /**
+ * (After the start), rebuild the availability data structures
+ */
+ @VisibleForTesting
+ public synchronized void buildRecentNodeLists() {
+ resetAvailableNodeLists();
+ // build the list of available nodes
+ for (Map.Entry<String, NodeInstance> entry : nodemap.entrySet()) {
+ NodeInstance ni = entry.getValue();
+ for (int i = 0; i < roleSize; i++) {
+ NodeEntry nodeEntry = ni.get(i);
+ if (nodeEntry != null && nodeEntry.isAvailable()) {
+ log.debug("Adding {} for role {}", ni, i);
+ listRecentNodesForRoleId(i).add(ni);
+ }
+ }
+ }
+ // sort the resulting arrays
+ for (int i = 0; i < roleSize; i++) {
+ sortRecentNodeList(i);
+ }
+ }
+
+ /**
+ * Get the nodes for an ID -may be null
+ * @param id role ID
+ * @return potentially null list
+ */
+ @VisibleForTesting
+ public List<NodeInstance> getRecentNodesForRoleId(int id) {
+ return recentNodes.get(id);
+ }
+
+ /**
+ * Get a possibly empty list of suggested nodes for a role.
+ * @param id role ID
+ * @return list
+ */
+ private LinkedList<NodeInstance> listRecentNodesForRoleId(int id) {
+ LinkedList<NodeInstance> instances = recentNodes.get(id);
+ if (instances == null) {
+ synchronized (this) {
+ // recheck in the synchronized block and recreate
+ if (recentNodes.get(id) == null) {
+ recentNodes.put(id, new LinkedList<NodeInstance>());
+ }
+ instances = recentNodes.get(id);
+ }
+ }
+ return instances;
+ }
+
+ /**
+ * Sort a the recent node list for a single role
+ * @param role role to sort
+ */
+ private void sortRecentNodeList(int role) {
+ List<NodeInstance> nodesForRoleId = getRecentNodesForRoleId(role);
+ if (nodesForRoleId != null) {
+ Collections.sort(nodesForRoleId, new NodeInstance.Preferred(role));
+ }
+ }
+
+ /**
+ * Find a node for use
+ * @param role role
+ * @return the instance, or null for none
+ */
+ @VisibleForTesting
+ public synchronized NodeInstance findRecentNodeForNewInstance(RoleStatus role) {
+ if (!role.isPlacementDesired()) {
+ // no data locality policy
+ return null;
+ }
+ int roleId = role.getKey();
+ boolean strictPlacement = role.isStrictPlacement();
+ NodeInstance nodeInstance = null;
+ // Get the list of possible targets.
+ // This is a live list: changes here are preserved
+ List<NodeInstance> targets = getRecentNodesForRoleId(roleId);
+ if (targets == null) {
+ // nothing to allocate on
+ return null;
+ }
+
+ int cnt = targets.size();
+ log.debug("There are {} node(s) to consider for {}", cnt, role.getName());
+ for (int i = 0; i < cnt && nodeInstance == null; i++) {
+ NodeInstance candidate = targets.get(i);
+ if (candidate.getActiveRoleInstances(roleId) == 0) {
+ // no active instances: check failure statistics
+ if (strictPlacement
+ || (candidate.isOnline() && !candidate.exceedsFailureThreshold(role))) {
+ targets.remove(i);
+ // exit criteria for loop is now met
+ nodeInstance = candidate;
+ } else {
+ // too many failures for this node
+ log.info("Recent node failures is higher than threshold {}. Not requesting host {}",
+ role.getNodeFailureThreshold(), candidate.hostname);
+ }
+ }
+ }
+
+ if (nodeInstance == null) {
+ log.info("No node found for {}", role.getName());
+ }
+ return nodeInstance;
+ }
+
+ /**
+ * Find a node for use
+ * @param role role
+ * @return the instance, or null for none
+ */
+ @VisibleForTesting
+ public synchronized List<NodeInstance> findNodeForNewAAInstance(RoleStatus role) {
+ // all nodes that are live and can host the role; no attempt to exclude ones
+ // considered failing
+ return nodemap.findAllNodesForRole(role.getKey(), role.getLabelExpression());
+ }
+
+ /**
+ * Request an instance on a given node.
+ * An outstanding request is created & tracked, with the
+ * relevant node entry for that role updated.
+ *<p>
+ * The role status entries will also be tracked
+ * <p>
+ * Returns the request that is now being tracked.
+ * If the node instance is not null, it's details about the role is incremented
+ *
+ * @param node node to target or null for "any"
+ * @param role role to request
+ * @return the request
+ */
+ public synchronized OutstandingRequest requestInstanceOnNode(
+ NodeInstance node, RoleStatus role, Resource resource) {
+ OutstandingRequest outstanding = outstandingRequests.newRequest(node, role.getKey());
+ outstanding.buildContainerRequest(resource, role, now());
+ return outstanding;
+ }
+
+ /**
+ * Find a node for a role and request an instance on that (or a location-less
+ * instance)
+ * @param role role status
+ * @return a request ready to go, or null if this is an AA request and no
+ * location can be found.
+ */
+ public synchronized OutstandingRequest requestContainerForRole(RoleStatus role) {
+
+ if (role.isAntiAffinePlacement()) {
+ return requestContainerForAARole(role);
+ } else {
+ Resource resource = recordFactory.newResource();
+ role.copyResourceRequirements(resource);
+ NodeInstance node = findRecentNodeForNewInstance(role);
+ return requestInstanceOnNode(node, role, resource);
+ }
+ }
+
+ /**
+ * Find a node for an AA role and request an instance on that (or a location-less
+ * instance)
+ * @param role role status
+ * @return a request ready to go, or null if no location can be found.
+ */
+ public synchronized OutstandingRequest requestContainerForAARole(RoleStatus role) {
+ List<NodeInstance> nodes = findNodeForNewAAInstance(role);
+ if (!nodes.isEmpty()) {
+ OutstandingRequest outstanding = outstandingRequests.newAARequest(
+ role.getKey(), nodes, role.getLabelExpression());
+ Resource resource = recordFactory.newResource();
+ role.copyResourceRequirements(resource);
+ outstanding.buildContainerRequest(resource, role, now());
+ return outstanding;
+ } else {
+ log.warn("No suitable location for {}", role.getName());
+ return null;
+ }
+ }
+ /**
+ * Get the list of active nodes ... walks the node map so
+ * is {@code O(nodes)}
+ * @param role role index
+ * @return a possibly empty list of nodes with an instance of that node
+ */
+ public synchronized List<NodeInstance> listActiveNodes(int role) {
+ return nodemap.listActiveNodes(role);
+ }
+
+ /**
+ * Get the node entry of a container
+ * @param container container to look up
+ * @return the entry
+ * @throws RuntimeException if the container has no hostname
+ */
+ public NodeEntry getOrCreateNodeEntry(Container container) {
+ return getOrCreateNodeInstance(container).getOrCreate(container);
+ }
+
+ /**
+ * Get the node instance of a container -always returns something
+ * @param container container to look up
+ * @return a (possibly new) node instance
+ * @throws RuntimeException if the container has no hostname
+ */
+ public synchronized NodeInstance getOrCreateNodeInstance(Container container) {
+ return nodemap.getOrCreate(RoleHistoryUtils.hostnameOf(container));
+ }
+
+ /**
+ * Get the node instance of a host if defined
+ * @param hostname hostname to look up
+ * @return a node instance or null
+ * @throws RuntimeException if the container has no hostname
+ */
+ public synchronized NodeInstance getExistingNodeInstance(String hostname) {
+ return nodemap.get(hostname);
+ }
+
+ /**
+ * Get the node instance of a container <i>if there's an entry in the history</i>
+ * @param container container to look up
+ * @return a node instance or null
+ * @throws RuntimeException if the container has no hostname
+ */
+ public synchronized NodeInstance getExistingNodeInstance(Container container) {
+ return nodemap.get(RoleHistoryUtils.hostnameOf(container));
+ }
+
+ /**
+ * Perform any pre-allocation operations on the list of allocated containers
+ * based on knowledge of system state.
+ * Currently this places requested hosts ahead of unrequested ones.
+ * @param allocatedContainers list of allocated containers
+ * @return list of containers potentially reordered
+ */
+ public synchronized List<Container> prepareAllocationList(List<Container> allocatedContainers) {
+
+ //partition into requested and unrequested
+ List<Container> requested =
+ new ArrayList<>(allocatedContainers.size());
+ List<Container> unrequested =
+ new ArrayList<>(allocatedContainers.size());
+ outstandingRequests.partitionRequests(this, allocatedContainers, requested, unrequested);
+
+ //give the unrequested ones lower priority
+ requested.addAll(unrequested);
+ return requested;
+ }
+
+ /**
+ * A container has been allocated on a node -update the data structures
+ * @param container container
+ * @param desiredCount desired #of instances
+ * @param actualCount current count of instances
+ * @return The allocation outcome
+ */
+ public synchronized ContainerAllocationResults onContainerAllocated(Container container,
+ long desiredCount,
+ long actualCount) {
+ int role = ContainerPriority.extractRole(container);
+
+ String hostname = RoleHistoryUtils.hostnameOf(container);
+ List<NodeInstance> nodeInstances = listRecentNodesForRoleId(role);
+ ContainerAllocationResults outcome =
+ outstandingRequests.onContainerAllocated(role, hostname, container);
+ if (desiredCount <= actualCount) {
+ // all outstanding requests have been satisfied
+ // clear all the lists, so returning nodes to the available set
+ List<NodeInstance> hosts = outstandingRequests.resetOutstandingRequests(role);
+ if (!hosts.isEmpty()) {
+ //add the list
+ log.info("Adding {} hosts for role {}", hosts.size(), role);
+ nodeInstances.addAll(hosts);
+ sortRecentNodeList(role);
+ }
+ }
+ return outcome;
+ }
+
+ /**
+ * A container has been assigned to a role instance on a node -update the data structures
+ * @param container container
+ */
+ public void onContainerAssigned(Container container) {
+ NodeInstance node = getOrCreateNodeInstance(container);
+ NodeEntry nodeEntry = node.getOrCreate(container);
+ nodeEntry.onStarting();
+ log.debug("Node {} has updated NodeEntry {}", node, nodeEntry);
+ }
+
+ /**
+ * Event: a container start has been submitted
+ * @param container container being started
+ * @param instance instance bound to the container
+ */
+ public void onContainerStartSubmitted(Container container,
+ RoleInstance instance) {
+ // no actions here
+ }
+
+ /**
+ * Container start event
+ * @param container container that just started
+ */
+ public void onContainerStarted(Container container) {
+ NodeEntry nodeEntry = getOrCreateNodeEntry(container);
+ nodeEntry.onStartCompleted();
+ touch();
+ }
+
+ /**
+ * A container failed to start: update the node entry state
+ * and return the container to the queue
+ * @param container container that failed
+ * @return true if the node was queued
+ */
+ public boolean onNodeManagerContainerStartFailed(Container container) {
+ return markContainerFinished(container, false, true, ContainerOutcome.Failed);
+ }
+
+ /**
+ * Does the RoleHistory have enough information about the YARN cluster
+ * to start placing AA requests? That is: has it the node map and
+ * any label information needed?
+ * @return true if the caller can start requesting AA nodes
+ */
+ public boolean canPlaceAANodes() {
+ return nodeUpdateReceived.get();
+ }
+
+ /**
+ * Get the last time the nodes were updated from YARN
+ * @return the update time or zero if never updated.
+ */
+ public long getNodesUpdatedTime() {
+ return nodesUpdatedTime.get();
+ }
+
+ /**
+ * Update failedNodes and nodemap based on the node state
+ *
+ * @param updatedNodes list of updated nodes
+ * @return true if a review should be triggered.
+ */
+ public synchronized boolean onNodesUpdated(List<NodeReport> updatedNodes) {
+ log.debug("Updating {} nodes", updatedNodes.size());
+ nodesUpdatedTime.set(now());
+ nodeUpdateReceived.set(true);
+ int printed = 0;
+ boolean triggerReview = false;
+ for (NodeReport updatedNode : updatedNodes) {
+ String hostname = updatedNode.getNodeId() == null
+ ? ""
+ : updatedNode.getNodeId().getHost();
+ NodeState nodeState = updatedNode.getNodeState();
+ if (hostname.isEmpty() || nodeState == null) {
+ log.warn("Ignoring incomplete update");
+ continue;
+ }
+ if (log.isDebugEnabled() && printed++ < 10) {
+ // log the first few, but avoid overloading the logs for a full cluster
+ // update
+ log.debug("Node \"{}\" is in state {}", hostname, nodeState);
+ }
+ // update the node; this also creates an instance if needed
+ boolean updated = nodemap.updateNode(hostname, updatedNode);
+ triggerReview |= updated;
+ }
+ return triggerReview;
+ }
+
+ /**
+ * A container release request was issued
+ * @param container container submitted
+ */
+ public void onContainerReleaseSubmitted(Container container) {
+ NodeEntry nodeEntry = getOrCreateNodeEntry(container);
+ nodeEntry.release();
+ }
+
+ /**
+ * App state notified of a container completed
+ * @param container completed container
+ * @return true if the node was queued
+ */
+ public boolean onReleaseCompleted(Container container) {
+ return markContainerFinished(container, true, false, ContainerOutcome.Failed);
+ }
+
+ /**
+ * App state notified of a container completed -but as
+ * it wasn't being released it is marked as failed
+ *
+ * @param container completed container
+ * @param shortLived was the container short lived?
+ * @param outcome
+ * @return true if the node is considered available for work
+ */
+ public boolean onFailedContainer(Container container,
+ boolean shortLived,
+ ContainerOutcome outcome) {
+ return markContainerFinished(container, false, shortLived, outcome);
+ }
+
+ /**
+ * Mark a container finished; if it was released then that is treated
+ * differently. history is {@code touch()}-ed
+ *
+ *
+ * @param container completed container
+ * @param wasReleased was the container released?
+ * @param shortLived was the container short lived?
+ * @param outcome
+ * @return true if the node was queued
+ */
+ protected synchronized boolean markContainerFinished(Container container,
+ boolean wasReleased,
+ boolean shortLived,
+ ContainerOutcome outcome) {
+ NodeEntry nodeEntry = getOrCreateNodeEntry(container);
+ log.info("Finished container for node {}, released={}, shortlived={}",
+ nodeEntry.rolePriority, wasReleased, shortLived);
+ boolean available;
+ if (shortLived) {
+ nodeEntry.onStartFailed();
+ available = false;
+ } else {
+ available = nodeEntry.containerCompleted(wasReleased, outcome);
+ maybeQueueNodeForWork(container, nodeEntry, available);
+ }
+ touch();
+ return available;
+ }
+
+ /**
+ * If the node is marked as available; queue it for assignments.
+ * Unsynced: requires caller to be in a sync block.
+ * @param container completed container
+ * @param nodeEntry node
+ * @param available available flag
+ * @return true if the node was queued
+ */
+ private boolean maybeQueueNodeForWork(Container container,
+ NodeEntry nodeEntry,
+ boolean available) {
+ if (available) {
+ //node is free
+ nodeEntry.setLastUsed(now());
+ NodeInstance ni = getOrCreateNodeInstance(container);
+ int roleId = ContainerPriority.extractRole(container);
+ log.debug("Node {} is now available for role id {}", ni, roleId);
+ listRecentNodesForRoleId(roleId).addFirst(ni);
+ }
+ return available;
+ }
+
+ /**
+ * Print the history to the log. This is for testing and diagnostics
+ */
+ public synchronized void dump() {
+ for (ProviderRole role : providerRoles) {
+ log.info(role.toString());
+ List<NodeInstance> instances = listRecentNodesForRoleId(role.id);
+ log.info(" available: " + instances.size()
+ + " " + SliderUtils.joinWithInnerSeparator(" ", instances));
+ }
+
+ log.info("Nodes in Cluster: {}", getClusterSize());
+ for (NodeInstance node : nodemap.values()) {
+ log.info(node.toFullString());
+ }
+ }
+
+ /**
+ * Build the mapping entry for persisting to the role history
+ * @return a mapping object
+ */
+ public synchronized Map<CharSequence, Integer> buildMappingForHistoryFile() {
+ Map<CharSequence, Integer> mapping = new HashMap<>(getRoleSize());
+ for (ProviderRole role : providerRoles) {
+ mapping.put(role.name, role.id);
+ }
+ return mapping;
+ }
+
+ /**
+ * Get a clone of the available list
+ * @param role role index
+ * @return a clone of the list
+ */
+ @VisibleForTesting
+ public List<NodeInstance> cloneRecentNodeList(int role) {
+ return new LinkedList<>(listRecentNodesForRoleId(role));
+ }
+
+ /**
+ * Get a snapshot of the outstanding placed request list
+ * @return a list of the requests outstanding at the time of requesting
+ */
+ @VisibleForTesting
+ public List<OutstandingRequest> listPlacedRequests() {
+ return outstandingRequests.listPlacedRequests();
+ }
+
+ /**
+ * Get a snapshot of the outstanding placed request list
+ * @return a list of the requests outstanding at the time of requesting
+ */
+ @VisibleForTesting
+ public List<OutstandingRequest> listOpenRequests() {
+ return outstandingRequests.listOpenRequests();
+ }
+
+ /**
+ * Escalate operation as triggered by external timer.
+ * @return a (usually empty) list of cancel/request operations.
+ */
+ public synchronized List<AbstractRMOperation> escalateOutstandingRequests() {
+ return outstandingRequests.escalateOutstandingRequests(now());
+ }
+ /**
+ * Escalate operation as triggered by external timer.
+ * @return a (usually empty) list of cancel/request operations.
+ */
+ public List<AbstractRMOperation> cancelOutstandingAARequests() {
+ return outstandingRequests.cancelOutstandingAARequests();
+ }
+
+ /**
+ * Cancel a number of outstanding requests for a role -that is, not
+ * actual containers, just requests for new ones.
+ * @param role role
+ * @param toCancel number to cancel
+ * @return a list of cancellable operations.
+ */
+ public List<AbstractRMOperation> cancelRequestsForRole(RoleStatus role, int toCancel) {
+ return role.isAntiAffinePlacement() ?
+ cancelRequestsForAARole(role, toCancel)
+ : cancelRequestsForSimpleRole(role, toCancel);
+ }
+
+ /**
+ * Build the list of requests to cancel from the outstanding list.
+ * @param role role
+ * @param toCancel number to cancel
+ * @return a list of cancellable operations.
+ */
+ private synchronized List<AbstractRMOperation> cancelRequestsForSimpleRole(RoleStatus role, int toCancel) {
+ Preconditions.checkArgument(toCancel > 0,
+ "trying to cancel invalid number of requests: " + toCancel);
+ List<AbstractRMOperation> results = new ArrayList<>(toCancel);
+ // first scan through the unplaced request list to find all of a role
+ int roleId = role.getKey();
+ List<OutstandingRequest> requests =
+ outstandingRequests.extractOpenRequestsForRole(roleId, toCancel);
+
+ // are there any left?
+ int remaining = toCancel - requests.size();
+ // ask for some placed nodes
+ requests.addAll(outstandingRequests.extractPlacedRequestsForRole(roleId, remaining));
+
+ // build cancellations
+ for (OutstandingRequest request : requests) {
+ results.add(request.createCancelOperation());
+ }
+ return results;
+ }
+
+ /**
+ * Build the list of requests to cancel for an AA role. This reduces the number
+ * of outstanding pending requests first, then cancels any active request,
+ * before finally asking for any placed containers
+ * @param role role
+ * @param toCancel number to cancel
+ * @return a list of cancellable operations.
+ */
+ private synchronized List<AbstractRMOperation> cancelRequestsForAARole(RoleStatus role, int toCancel) {
+ List<AbstractRMOperation> results = new ArrayList<>(toCancel);
+ int roleId = role.getKey();
+ List<OutstandingRequest> requests = new ArrayList<>(toCancel);
+ // there may be pending requests which can be cancelled here
+ long pending = role.getPendingAntiAffineRequests();
+ if (pending > 0) {
+ // there are some pending ones which can be cancelled first
+ long pendingToCancel = Math.min(pending, toCancel);
+ log.info("Cancelling {} pending AA allocations, leaving {}", toCancel,
+ pendingToCancel);
+ role.setPendingAntiAffineRequests(pending - pendingToCancel);
+ toCancel -= pendingToCancel;
+ }
+ if (toCancel > 0 && role.isAARequestOutstanding()) {
+ // not enough
+ log.info("Cancelling current AA request");
+ // find the single entry which may be running
+ requests = outstandingRequests.extractOpenRequestsForRole(roleId, toCancel);
+ role.cancelOutstandingAARequest();
+ toCancel--;
+ }
+
+ // ask for some excess nodes
+ if (toCancel > 0) {
+ requests.addAll(outstandingRequests.extractPlacedRequestsForRole(roleId, toCancel));
+ }
+
+ // build cancellations
+ for (OutstandingRequest request : requests) {
+ results.add(request.createCancelOperation());
+ }
+ return results;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistoryUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistoryUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistoryUtils.java
new file mode 100644
index 0000000..ea6197b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistoryUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.slider.common.tools.SliderUtils;
+
+public class RoleHistoryUtils {
+
+ public static String hostnameOf(Container container) {
+ NodeId nodeId = container.getNodeId();
+ if (nodeId== null) {
+ throw new RuntimeException("Container has no node ID: %s" +
+ SliderUtils.containerToString(container));
+ }
+ return nodeId.getHost();
+ }
+
+ /**
+ * Decrement a value but hold it at zero. Usually a sanity check
+ * on counters tracking outstanding operations
+ * @param val value
+ * @return decremented value
+ */
+ public static int decToFloor(int val) {
+ int v = val-1;
+ if (v < 0) {
+ v = 0;
+ }
+ return v;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHostnamePair.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHostnamePair.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHostnamePair.java
new file mode 100644
index 0000000..920887a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHostnamePair.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import java.util.Objects;
+
+public class RoleHostnamePair {
+
+ /**
+ * requested role
+ */
+ public final int roleId;
+
+ /**
+ * hostname -will be null if node==null
+ */
+ public final String hostname;
+
+ public RoleHostnamePair(int roleId, String hostname) {
+ this.roleId = roleId;
+ this.hostname = hostname;
+ }
+
+ public int getRoleId() {
+ return roleId;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof RoleHostnamePair)) {
+ return false;
+ }
+ RoleHostnamePair that = (RoleHostnamePair) o;
+ return Objects.equals(roleId, that.roleId) &&
+ Objects.equals(hostname, that.hostname);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(roleId, hostname);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "RoleHostnamePair{");
+ sb.append("roleId=").append(roleId);
+ sb.append(", hostname='").append(hostname).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java
new file mode 100644
index 0000000..30cfec9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ProtocolTypes;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.slider.api.ClusterNode;
+import org.apache.slider.api.proto.Messages;
+import org.apache.slider.api.types.ContainerInformation;
+import org.apache.slider.common.tools.SliderUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tracking information about a container
+ */
+public final class RoleInstance implements Cloneable {
+
+ public Container container;
+ /**
+ * Container ID
+ */
+ public final String id;
+ public long createTime;
+ public long startTime;
+ /**
+ * flag set when it is released, to know if it has
+ * already been targeted for termination
+ */
+ public boolean released;
+
+ /**
+ * Name of the role
+ */
+ public String role;
+ public String group;
+
+ /**
+ * Version of the app
+ */
+ public String appVersion;
+
+ /**
+ * Role Id; matches priority in resources.json
+ */
+ public int roleId;
+
+ /**
+ * state from StateValues
+ */
+ public int state;
+
+ /**
+ * Exit code: only valid if the state >= STOPPED
+ */
+ public int exitCode;
+
+ /**
+ * what was the command executed?
+ */
+ public String command;
+
+ /**
+ * Any diagnostics
+ */
+ public String diagnostics;
+
+ /**
+ * What is the tail output from the executed process (or [] if not started
+ * or the log cannot be picked up
+ */
+ public String[] output;
+
+ /**
+ * Any environment details
+ */
+ public String[] environment;
+
+ public String ip;
+ public String hostname;
+ public String host;
+ public String hostURL;
+ public ContainerAllocationOutcome placement;
+
+
+ /**
+ * A list of registered endpoints.
+ */
+ private List<Endpoint> endpoints =
+ new ArrayList<>(2);
+
+ public RoleInstance(ContainerAssignment assignment) {
+ this(assignment.container);
+ placement = assignment.placement;
+ }
+ /**
+ * Create an instance to track an allocated container
+ * @param container a container which must be non null, and have a non-null Id field.
+ */
+ public RoleInstance(Container container) {
+ Preconditions.checkNotNull(container, "Null container");
+ Preconditions.checkState(container.getId() != null,
+ "Null container ID");
+
+ this.container = container;
+ id = container.getId().toString();
+ if (container.getNodeId() != null) {
+ host = container.getNodeId().getHost();
+ }
+ if (container.getNodeHttpAddress() != null) {
+ hostURL = "http://" + container.getNodeHttpAddress();
+ }
+ }
+
+ public ContainerId getId() {
+ return container.getId();
+ }
+
+ public NodeId getHost() {
+ return container.getNodeId();
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb =
+ new StringBuilder("RoleInstance{");
+ sb.append("role='").append(role).append('\'');
+ sb.append(", id='").append(id).append('\'');
+ sb.append(", container=").append(SliderUtils.containerToString(container));
+ sb.append(", createTime=").append(createTime);
+ sb.append(", startTime=").append(startTime);
+ sb.append(", released=").append(released);
+ sb.append(", roleId=").append(roleId);
+ sb.append(", host=").append(host);
+ sb.append(", hostURL=").append(hostURL);
+ sb.append(", state=").append(state);
+ sb.append(", placement=").append(placement);
+ sb.append(", exitCode=").append(exitCode);
+ sb.append(", command='").append(command).append('\'');
+ sb.append(", diagnostics='").append(diagnostics).append('\'');
+ sb.append(", output=").append(Arrays.toString(output));
+ sb.append(", environment=").append(Arrays.toString(environment));
+ sb.append('}');
+ return sb.toString();
+ }
+
+ public ContainerId getContainerId() {
+ return container != null ? container.getId() : null;
+ }
+
+ /**
+ * Generate the protobuf format of a request
+ * @return protobuf format. This excludes the Container info
+ */
+ public Messages.RoleInstanceState toProtobuf() {
+ Messages.RoleInstanceState.Builder builder =
+ Messages.RoleInstanceState.newBuilder();
+ if (container != null) {
+ builder.setName(container.getId().toString());
+ } else {
+ builder.setName("unallocated instance");
+ }
+ if (command != null) {
+ builder.setCommand(command);
+ }
+ if (environment != null) {
+ builder.addAllEnvironment(Arrays.asList(environment));
+ }
+ if (diagnostics != null) {
+ builder.setDiagnostics(diagnostics);
+ }
+ builder.setExitCode(exitCode);
+
+ if (output != null) {
+ builder.addAllOutput(Arrays.asList(output));
+ }
+ if (role != null) {
+ builder.setRole(role);
+ }
+ builder.setRoleId(roleId);
+ builder.setState(state);
+
+ builder.setReleased(released);
+ builder.setCreateTime(createTime);
+ builder.setStartTime(startTime);
+ builder.setHost(host);
+ builder.setHostURL(hostURL);
+ if (appVersion != null) {
+ builder.setAppVersion(appVersion);
+ }
+ return builder.build();
+ }
+
+ /**
+ * Build a serializable ClusterNode structure from this instance.
+ * This operation is unsynchronized.
+ * @return a serialized value.
+ */
+ public ClusterNode toClusterNode() {
+ ClusterNode node;
+ if (container != null) {
+ node = new ClusterNode(container.getId());
+ } else {
+ node = new ClusterNode();
+ node.name = "unallocated instance";
+ }
+ node.command = command;
+ node.createTime = createTime;
+ node.diagnostics = diagnostics;
+ if (environment != null) {
+ node.environment = Arrays.copyOf(environment, environment.length);
+ }
+ node.exitCode = exitCode;
+ node.ip = ip;
+ node.hostname = hostname;
+ node.host = host;
+ node.hostUrl = hostURL;
+ if (output != null) {
+ node.output = Arrays.copyOf(output, output.length);
+ }
+ node.released = released;
+ node.role = role;
+ node.roleId = roleId;
+ node.startTime = startTime ;
+ node.state = state;
+
+ return node;
+ }
+
+ /**
+ * Clone operation clones all the simple values but shares the
+ * Container object into the cloned copy -same with the output,
+ * diagnostics and env arrays.
+ * @return a clone of the object
+ * @throws CloneNotSupportedException
+ */
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ RoleInstance cloned = (RoleInstance) super.clone();
+ // clone the endpoint list, but not the values
+ cloned.endpoints = new ArrayList<Endpoint>(this.endpoints);
+ return cloned;
+ }
+
+ /**
+ * Get the list of endpoints.
+ * @return the endpoint list.
+ */
+ public List<Endpoint> getEndpoints() {
+ return endpoints;
+ }
+
+ /**
+ * Add an endpoint registration
+ * @param endpoint endpoint (non-null)
+ */
+ public void addEndpoint(Endpoint endpoint) {
+ Preconditions.checkArgument(endpoint != null);
+ endpoints.add(endpoint);
+ }
+
+ /**
+ * Register a port endpoint as an inet-addr formatted endpoint, using the
+ * hostname as the first part of the address
+ * @param port port port
+ * @param api API API name
+ */
+ public void registerPortEndpoint(int port, String api) {
+ Endpoint epr =
+ RegistryTypeUtils.inetAddrEndpoint(api,
+ ProtocolTypes.PROTOCOL_TCP, host, port);
+ addEndpoint(epr);
+ }
+
+ /**
+ * Serialize. Some data structures (e.g output)
+ * may be shared
+ * @return a serialized form for marshalling as JSON
+ */
+ public ContainerInformation serialize() {
+ ContainerInformation info = new ContainerInformation();
+ info.containerId = id;
+ info.component = role;
+ info.appVersion = appVersion;
+ info.startTime = startTime;
+ info.createTime = createTime;
+ info.diagnostics = diagnostics;
+ info.state = state;
+ info.host = host;
+ info.hostURL = hostURL;
+ info.released = released ? Boolean.TRUE : null;
+ if (placement != null) {
+ info.placement = placement.toString();
+ }
+ if (output != null) {
+ info.output = output;
+ }
+ return info;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
new file mode 100644
index 0000000..0a3a3c9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.slider.api.types.ComponentInformation;
+import org.apache.slider.api.types.RoleStatistics;
+import org.apache.slider.providers.PlacementPolicy;
+import org.apache.slider.providers.ProviderRole;
+import org.apache.slider.server.appmaster.management.BoolMetricPredicate;
+import org.apache.slider.server.appmaster.management.LongGauge;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Models the ongoing status of all nodes in an application.
+ *
+ * These structures are shared across the {@link AppState} and {@link RoleHistory} structures,
+ * and must be designed for synchronous access. Atomic counters are preferred to anything which
+ * requires synchronization. Where synchronized access is good is that it allows for
+ * the whole instance to be locked, for updating multiple entries.
+ */
+public final class RoleStatus implements Cloneable, MetricSet {
+
+ private final String name;
+ private final String group;
+
+ /**
+ * Role priority
+ */
+ private final int key;
+ private final ProviderRole providerRole;
+
+ private final LongGauge actual = new LongGauge();
+ private final LongGauge completed = new LongGauge();
+ private final LongGauge desired = new LongGauge();
+ private final LongGauge failed = new LongGauge();
+ private final LongGauge failedRecently = new LongGauge(0);
+ private final LongGauge limitsExceeded = new LongGauge(0);
+ private final LongGauge nodeFailed = new LongGauge(0);
+ /** Number of AA requests queued. */
+ private final LongGauge pendingAntiAffineRequests = new LongGauge(0);
+ private final LongGauge preempted = new LongGauge(0);
+ private final LongGauge releasing = new LongGauge();
+ private final LongGauge requested = new LongGauge();
+ private final LongGauge started = new LongGauge();
+ private final LongGauge startFailed = new LongGauge();
+ private final LongGauge totalRequested = new LongGauge();
+
+ /** resource requirements */
+ private Resource resourceRequirements;
+
+
+ /** any pending AA request */
+ private volatile OutstandingRequest outstandingAArequest = null;
+
+
+ private String failureMessage = "";
+
+ public RoleStatus(ProviderRole providerRole) {
+ this.providerRole = providerRole;
+ this.name = providerRole.name;
+ this.group = providerRole.group;
+ this.key = providerRole.id;
+ }
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ Map<String, Metric> metrics = new HashMap<>(15);
+ metrics.put("actual", actual);
+ metrics.put("completed", completed );
+ metrics.put("desired", desired);
+ metrics.put("failed", failed);
+ metrics.put("limitsExceeded", limitsExceeded);
+ metrics.put("nodeFailed", nodeFailed);
+ metrics.put("preempted", preempted);
+ metrics.put("pendingAntiAffineRequests", pendingAntiAffineRequests);
+ metrics.put("releasing", releasing);
+ metrics.put("requested", requested);
+ metrics.put("preempted", preempted);
+ metrics.put("releasing", releasing );
+ metrics.put("requested", requested);
+ metrics.put("started", started);
+ metrics.put("startFailed", startFailed);
+ metrics.put("totalRequested", totalRequested);
+
+ metrics.put("outstandingAArequest",
+ new BoolMetricPredicate(new BoolMetricPredicate.Eval() {
+ @Override
+ public boolean eval() {
+ return isAARequestOutstanding();
+ }
+ }));
+ return metrics;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public int getKey() {
+ return key;
+ }
+
+ public int getPriority() {
+ return getKey();
+ }
+
+ /**
+ * Get the placement policy enum, from the values in
+ * {@link PlacementPolicy}
+ * @return the placement policy for this role
+ */
+ public int getPlacementPolicy() {
+ return providerRole.placementPolicy;
+ }
+
+ public long getPlacementTimeoutSeconds() {
+ return providerRole.placementTimeoutSeconds;
+ }
+
+ /**
+ * The number of failures on a specific node that can be tolerated
+ * before selecting a different node for placement
+ * @return
+ */
+ public int getNodeFailureThreshold() {
+ return providerRole.nodeFailureThreshold;
+ }
+
+ public boolean isExcludeFromFlexing() {
+ return hasPlacementPolicy(PlacementPolicy.EXCLUDE_FROM_FLEXING);
+ }
+
+ public boolean isStrictPlacement() {
+ return hasPlacementPolicy(PlacementPolicy.STRICT);
+ }
+
+ public boolean isAntiAffinePlacement() {
+ return hasPlacementPolicy(PlacementPolicy.ANTI_AFFINITY_REQUIRED);
+ }
+
+ public boolean hasPlacementPolicy(int policy) {
+ return 0 != (getPlacementPolicy() & policy);
+ }
+
+ public boolean isPlacementDesired() {
+ return !hasPlacementPolicy(PlacementPolicy.ANYWHERE);
+ }
+
+ public long getDesired() {
+ return desired.get();
+ }
+
+ public void setDesired(long desired) {
+ this.desired.set(desired);
+ }
+
+ public long getActual() {
+ return actual.get();
+ }
+
+ public long incActual() {
+ return actual.incrementAndGet();
+ }
+
+ public long decActual() {
+ return actual.decToFloor(1);
+ }
+
+ /**
+ * Get the request count.
+ * @return a count of requested containers
+ */
+ public long getRequested() {
+ return requested.get();
+ }
+
+ public long incRequested() {
+ totalRequested.incrementAndGet();
+ return requested.incrementAndGet();
+ }
+
+ public void cancel(long count) {
+ requested.decToFloor(count);
+ }
+
+ public void decRequested() {
+ cancel(1);
+ }
+
+ public long getReleasing() {
+ return releasing.get();
+ }
+
+ public long incReleasing() {
+ return releasing.incrementAndGet();
+ }
+
+ public long decReleasing() {
+ return releasing.decToFloor(1);
+ }
+
+ public long getFailed() {
+ return failed.get();
+ }
+
+ public long getFailedRecently() {
+ return failedRecently.get();
+ }
+
+ /**
+ * Reset the recent failure
+ * @return the number of failures in the "recent" window
+ */
+ public long resetFailedRecently() {
+ return failedRecently.getAndSet(0);
+ }
+
+ public long getLimitsExceeded() {
+ return limitsExceeded.get();
+ }
+
+ public long incPendingAntiAffineRequests(long v) {
+ return pendingAntiAffineRequests.addAndGet(v);
+ }
+
+ /**
+ * Probe for an outstanding AA request being true
+ * @return true if there is an outstanding AA Request
+ */
+ public boolean isAARequestOutstanding() {
+ return outstandingAArequest != null;
+ }
+
+ /**
+ * expose the predicate {@link #isAARequestOutstanding()} as an integer,
+ * which is very convenient in tests
+ * @return 1 if there is an outstanding request; 0 if not
+ */
+ public int getOutstandingAARequestCount() {
+ return isAARequestOutstanding()? 1: 0;
+ }
+ /**
+ * Note that a role failed, text will
+ * be used in any diagnostics if an exception
+ * is later raised.
+ * @param startupFailure flag to indicate this was a startup event
+ * @param text text about the failure
+ * @param outcome outcome of the container
+ */
+ public synchronized void noteFailed(boolean startupFailure, String text,
+ ContainerOutcome outcome) {
+ if (text != null) {
+ failureMessage = text;
+ }
+ switch (outcome) {
+ case Preempted:
+ preempted.incrementAndGet();
+ break;
+
+ case Node_failure:
+ nodeFailed.incrementAndGet();
+ failed.incrementAndGet();
+ break;
+
+ case Failed_limits_exceeded: // exceeded memory or CPU; app/configuration related
+ limitsExceeded.incrementAndGet();
+ // fall through
+ case Failed: // application failure, possibly node related, possibly not
+ default: // anything else (future-proofing)
+ failed.incrementAndGet();
+ failedRecently.incrementAndGet();
+ //have a look to see if it short lived
+ if (startupFailure) {
+ incStartFailed();
+ }
+ break;
+ }
+ }
+
+ public long getStartFailed() {
+ return startFailed.get();
+ }
+
+ public synchronized void incStartFailed() {
+ startFailed.getAndIncrement();
+ }
+
+ public synchronized String getFailureMessage() {
+ return failureMessage;
+ }
+
+ public long getCompleted() {
+ return completed.get();
+ }
+
+ public synchronized void setCompleted(int completed) {
+ this.completed.set(completed);
+ }
+
+ public long incCompleted() {
+ return completed.incrementAndGet();
+ }
+ public long getStarted() {
+ return started.get();
+ }
+
+ public synchronized void incStarted() {
+ started.incrementAndGet();
+ }
+
+ public long getTotalRequested() {
+ return totalRequested.get();
+ }
+
+ public long getPreempted() {
+ return preempted.get();
+ }
+
+ public long getNodeFailed() {
+ return nodeFailed.get();
+ }
+
+ public long getPendingAntiAffineRequests() {
+ return pendingAntiAffineRequests.get();
+ }
+
+ public void setPendingAntiAffineRequests(long pendingAntiAffineRequests) {
+ this.pendingAntiAffineRequests.set(pendingAntiAffineRequests);
+ }
+
+ public long decPendingAntiAffineRequests() {
+ return pendingAntiAffineRequests.decToFloor(1);
+ }
+
+ public OutstandingRequest getOutstandingAArequest() {
+ return outstandingAArequest;
+ }
+
+ public void setOutstandingAArequest(OutstandingRequest outstandingAArequest) {
+ this.outstandingAArequest = outstandingAArequest;
+ }
+
+ /**
+ * Complete the outstanding AA request (there's no check for one in progress, caller
+ * expected to have done that).
+ */
+ public void completeOutstandingAARequest() {
+ setOutstandingAArequest(null);
+ }
+
+ /**
+ * Cancel any outstanding AA request. Harmless if the role is non-AA, or
+ * if there are no outstanding requests.
+ */
+ public void cancelOutstandingAARequest() {
+ if (outstandingAArequest != null) {
+ setOutstandingAArequest(null);
+ setPendingAntiAffineRequests(0);
+ decRequested();
+ }
+ }
+
+ /**
+ * Get the number of roles we are short of.
+ * nodes released are ignored.
+ * @return the positive or negative number of roles to add/release.
+ * 0 means "do nothing".
+ */
+ public long getDelta() {
+ long inuse = getActualAndRequested();
+ long delta = desired.get() - inuse;
+ if (delta < 0) {
+ //if we are releasing, remove the number that are already released.
+ delta += releasing.get();
+ //but never switch to a positive
+ delta = Math.min(delta, 0);
+ }
+ return delta;
+ }
+
+ /**
+ * Get count of actual and requested containers. This includes pending ones
+ * @return the size of the application when outstanding requests are included.
+ */
+ public long getActualAndRequested() {
+ return actual.get() + requested.get();
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("RoleStatus{");
+ sb.append("name='").append(name).append('\'');
+ sb.append(", group=").append(group);
+ sb.append(", key=").append(key);
+ sb.append(", desired=").append(desired);
+ sb.append(", actual=").append(actual);
+ sb.append(", requested=").append(requested);
+ sb.append(", releasing=").append(releasing);
+ sb.append(", failed=").append(failed);
+ sb.append(", startFailed=").append(startFailed);
+ sb.append(", started=").append(started);
+ sb.append(", completed=").append(completed);
+ sb.append(", totalRequested=").append(totalRequested);
+ sb.append(", preempted=").append(preempted);
+ sb.append(", nodeFailed=").append(nodeFailed);
+ sb.append(", failedRecently=").append(failedRecently);
+ sb.append(", limitsExceeded=").append(limitsExceeded);
+ sb.append(", resourceRequirements=").append(resourceRequirements);
+ sb.append(", isAntiAffinePlacement=").append(isAntiAffinePlacement());
+ if (isAntiAffinePlacement()) {
+ sb.append(", pendingAntiAffineRequests=").append(pendingAntiAffineRequests);
+ sb.append(", outstandingAArequest=").append(outstandingAArequest);
+ }
+ sb.append(", failureMessage='").append(failureMessage).append('\'');
+ sb.append(", providerRole=").append(providerRole);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ @Override
+ public synchronized Object clone() throws CloneNotSupportedException {
+ return super.clone();
+ }
+
+ /**
+ * Get the provider role
+ * @return the provider role
+ */
+ public ProviderRole getProviderRole() {
+ return providerRole;
+ }
+
+ /**
+ * Build the statistics map from the current data
+ * @return a map for use in statistics reports
+ */
+ public Map<String, Integer> buildStatistics() {
+ ComponentInformation componentInformation = serialize();
+ return componentInformation.buildStatistics();
+ }
+
+ /**
+ * Produced a serialized form which can be served up as JSON
+ * @return a summary of the current role status.
+ */
+ public synchronized ComponentInformation serialize() {
+ ComponentInformation info = new ComponentInformation();
+ info.name = name;
+ info.priority = getPriority();
+ info.desired = desired.intValue();
+ info.actual = actual.intValue();
+ info.requested = requested.intValue();
+ info.releasing = releasing.intValue();
+ info.failed = failed.intValue();
+ info.startFailed = startFailed.intValue();
+ info.placementPolicy = getPlacementPolicy();
+ info.failureMessage = failureMessage;
+ info.totalRequested = totalRequested.intValue();
+ info.failedRecently = failedRecently.intValue();
+ info.nodeFailed = nodeFailed.intValue();
+ info.preempted = preempted.intValue();
+ info.pendingAntiAffineRequestCount = pendingAntiAffineRequests.intValue();
+ info.isAARequestOutstanding = isAARequestOutstanding();
+ return info;
+ }
+
+ /**
+ * Get the (possibly null) label expression for this role
+ * @return a string or null
+ */
+ public String getLabelExpression() {
+ return providerRole.labelExpression;
+ }
+
+ public Resource getResourceRequirements() {
+ return resourceRequirements;
+ }
+
+ public void setResourceRequirements(Resource resourceRequirements) {
+ this.resourceRequirements = resourceRequirements;
+ }
+
+ /**
+ * Compare two role status entries by name
+ */
+ public static class CompareByName implements Comparator<RoleStatus>,
+ Serializable {
+ @Override
+ public int compare(RoleStatus o1, RoleStatus o2) {
+ return o1.getName().compareTo(o2.getName());
+ }
+ }
+
+ /**
+ * Compare two role status entries by key
+ */
+ public static class CompareByKey implements Comparator<RoleStatus>,
+ Serializable {
+ @Override
+ public int compare(RoleStatus o1, RoleStatus o2) {
+ return (o1.getKey() < o2.getKey() ? -1 : (o1.getKey() == o2.getKey() ? 0 : 1));
+ }
+ }
+
+ /**
+ * Given a resource, set its requirements to those this role needs
+ * @param resource resource to configure
+ * @return the resource
+ */
+ public Resource copyResourceRequirements(Resource resource) {
+ Preconditions.checkNotNull(resourceRequirements,
+ "Role resource requirements have not been set");
+ resource.setMemory(resourceRequirements.getMemory());
+ resource.setVirtualCores(resourceRequirements.getVirtualCores());
+ return resource;
+ }
+
+ public synchronized RoleStatistics getStatistics() {
+ RoleStatistics stats = new RoleStatistics();
+ stats.activeAA = getOutstandingAARequestCount();
+ stats.actual = actual.get();
+ stats.desired = desired.get();
+ stats.failed = failed.get();
+ stats.limitsExceeded = limitsExceeded.get();
+ stats.nodeFailed = nodeFailed.get();
+ stats.preempted = preempted.get();
+ stats.releasing = releasing.get();
+ stats.requested = requested.get();
+ stats.started = started.get();
+ stats.startFailed = startFailed.get();
+ stats.totalRequested = totalRequested.get();
+ return stats;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java
new file mode 100644
index 0000000..b848096
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import java.util.List;
+
+/**
+ * Simplest release selector simply returns the list
+ */
+public class SimpleReleaseSelector implements ContainerReleaseSelector {
+
+ @Override
+ public List<RoleInstance> sortCandidates(int roleId,
+ List<RoleInstance> candidates) {
+ return candidates;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org